From a8341d4edac259989d60f51fde511c9c7338aaeb Mon Sep 17 00:00:00 2001 From: Liebing Date: Thu, 15 Jan 2026 16:31:57 +0800 Subject: [PATCH 1/2] [server] Update tablet server metadata cache when TableRegistration changed --- .../org/apache/fluss/metadata/TableInfo.java | 17 -- .../paimon/LakeEnabledTableCreateITCase.java | 36 +++ .../CoordinatorEventProcessor.java | 44 ++++ .../coordinator/CoordinatorService.java | 5 - .../server/coordinator/MetadataManager.java | 23 +- .../event/TableRegistrationChangeEvent.java | 67 +++++ .../event/watcher/TableChangeWatcher.java | 17 +- .../apache/fluss/server/log/LogTablet.java | 2 - .../apache/fluss/server/replica/Replica.java | 26 ++ .../fluss/server/replica/ReplicaManager.java | 20 ++ .../CoordinatorEventProcessorTest.java | 67 ++++- .../coordinator/TableManagerITCase.java | 33 +-- .../event/watcher/TableChangeWatcherTest.java | 64 ++++- .../server/log/remote/RemoteLogITCase.java | 241 ++++++++++++++++++ .../fluss/server/replica/ReplicaTest.java | 22 ++ .../server/testutils/RpcMessageTestUtils.java | 20 +- 16 files changed, 633 insertions(+), 71 deletions(-) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index eba41e0788..be894ee790 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -340,23 +340,6 @@ public static TableInfo of( modifiedTime); } - /** Replace a TableInfo with a new SchemaInfo. */ - public TableInfo withNewSchema(SchemaInfo schemaInfo) { - return new TableInfo( - tablePath, - tableId, - schemaInfo.getSchemaId(), - schemaInfo.getSchema(), - bucketKeys, - partitionKeys, - numBuckets, - properties, - customProperties, - comment, - createdTime, - modifiedTime); - } - @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 6fb7003223..cf6def30fd 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -27,9 +27,11 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; @@ -59,6 +61,7 @@ import javax.annotation.Nullable; import java.nio.file.Files; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -72,6 +75,7 @@ import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -584,6 +588,7 @@ void testAlterLakeEnabledLogTable() throws Exception { .build(); TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter"); admin.createTable(logTablePath, logTable, false).get(); + long tableId = admin.getTableInfo(logTablePath).get().getTableId(); assertThatThrownBy( () -> @@ -591,6 +596,9 @@ void testAlterLakeEnabledLogTable() throws Exception { Identifier.create(DATABASE, logTablePath.getTableName()))) .isInstanceOf(Catalog.TableNotExistException.class); + // verify LogTablet datalake status is initially disabled + verifyLogTabletDataLakeEnabled(tableId, false); + // enable lake TableChange.SetOption enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); @@ -598,6 +606,9 @@ void testAlterLakeEnabledLogTable() throws Exception { admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is enabled + verifyLogTabletDataLakeEnabled(tableId, true); + Identifier paimonTablePath = Identifier.create(DATABASE, logTablePath.getTableName()); Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath); @@ -635,11 +646,17 @@ void testAlterLakeEnabledLogTable() throws Exception { // paimon table should still exist although lake is disabled paimonCatalog.getTable(paimonTablePath); + // verify LogTablet datalake status is disabled + verifyLogTabletDataLakeEnabled(tableId, false); + // try to enable lake table again enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); changes = Collections.singletonList(enableLake); admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is enabled again + verifyLogTabletDataLakeEnabled(tableId, true); + // write some data to the lake table writeData(paimonCatalog.getTable(paimonTablePath)); Optional snapshot = paimonCatalog.getTable(paimonTablePath).latestSnapshot(); @@ -649,10 +666,16 @@ void testAlterLakeEnabledLogTable() throws Exception { changes = Collections.singletonList(disableLake); admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is disabled again + verifyLogTabletDataLakeEnabled(tableId, false); + // try to enable lake table again, the snapshot should not change changes = Collections.singletonList(enableLake); admin.alterTable(logTablePath, changes, false).get(); assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot); + + // verify LogTablet datalake status is enabled + verifyLogTabletDataLakeEnabled(tableId, true); } @Test @@ -1021,6 +1044,19 @@ private void verifyPaimonTable( assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment()); } + private void verifyLogTabletDataLakeEnabled(long tableId, boolean isDataLakeEnabled) { + for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + retry( + Duration.ofMinutes(1), + () -> { + Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + assertThat(replica.getLogTablet().isDataLakeEnabled()) + .isEqualTo(isDataLakeEnabled); + }); + } + } + private TableDescriptor createTableDescriptor( int columnNum, int bucketNum, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 55b59753c7..7a86e3f764 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -85,6 +85,7 @@ import org.apache.fluss.server.coordinator.event.RebalanceEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.rebalance.RebalanceManager; @@ -560,6 +561,8 @@ public void process(CoordinatorEvent event) { } else if (event instanceof SchemaChangeEvent) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; processSchemaChange(schemaChangeEvent); + } else if (event instanceof TableRegistrationChangeEvent) { + processTableRegistrationChange((TableRegistrationChangeEvent) event); } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) { processNotifyLeaderAndIsrResponseReceivedEvent( (NotifyLeaderAndIsrResponseReceivedEvent) event); @@ -720,6 +723,47 @@ private void processSchemaChange(SchemaChangeEvent schemaChangeEvent) { null); } + private void processTableRegistrationChange(TableRegistrationChangeEvent event) { + TablePath tablePath = event.getTablePath(); + Long tableId = coordinatorContext.getTableIdByPath(tablePath); + + // Skip if the table is not yet registered in coordinator context. + // Should not happen in normal cases. + if (tableId == null) { + return; + } + + TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId); + + TableInfo newTableInfo = + event.getTableRegistration().toTableInfo(tablePath, oldTableInfo.getSchemaInfo()); + coordinatorContext.putTableInfo(newTableInfo); + postAlterTableProperties(oldTableInfo, newTableInfo); + + // Notify tablet servers about the metadata change + updateTabletServerMetadataCache( + new HashSet<>(coordinatorContext.getLiveTabletServers().values()), + tableId, + null, + null); + } + + private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTableInfo) { + boolean toEnableDataLake = + !oldTableInfo.getTableConfig().isDataLakeEnabled() + && newTableInfo.getTableConfig().isDataLakeEnabled(); + boolean toDisableDataLake = + oldTableInfo.getTableConfig().isDataLakeEnabled() + && !newTableInfo.getTableConfig().isDataLakeEnabled(); + + if (toEnableDataLake) { + // if the table is lake table, we need to add it to lake table tiering manager + lakeTableTieringManager.addNewLakeTable(newTableInfo); + } else if (toDisableDataLake) { + lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId()); + } + } + private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { long partitionId = createPartitionEvent.getPartitionId(); // skip the partition if it already exists diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 94d56dd338..a9cd5b3c5f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -378,8 +378,6 @@ public CompletableFuture alterTable(AlterTableRequest reques + "table properties or table schema."); } - LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer = - lakeCatalogDynamicLoader.getLakeCatalogContainer(); LakeCatalog.Context lakeCatalogContext = new DefaultLakeCatalogContext(false, currentSession().getPrincipal()); @@ -388,7 +386,6 @@ public CompletableFuture alterTable(AlterTableRequest reques tablePath, alterSchemaChanges, request.isIgnoreIfNotExists(), - lakeCatalogContainer.getLakeCatalog(), lakeCatalogContext); } @@ -398,8 +395,6 @@ public CompletableFuture alterTable(AlterTableRequest reques alterTableConfigChanges, tablePropertyChanges, request.isIgnoreIfNotExists(), - lakeCatalogContainer.getLakeCatalog(), - lakeTableTieringManager, lakeCatalogContext); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index f728e91da5..9f23979701 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -327,7 +327,6 @@ public void alterTableSchema( TablePath tablePath, List schemaChanges, boolean ignoreIfNotExists, - @Nullable LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) throws TableNotExistException, TableNotPartitionedException { try { @@ -339,8 +338,7 @@ public void alterTableSchema( Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); // Lake First: sync to Lake before updating Fluss schema - syncSchemaChangesToLake( - tablePath, table, schemaChanges, lakeCatalog, lakeCatalogContext); + syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext); // Update Fluss schema (ZK) after Lake sync succeeds if (!newSchema.equals(table.getSchema())) { @@ -370,12 +368,13 @@ private void syncSchemaChangesToLake( TablePath tablePath, TableInfo tableInfo, List schemaChanges, - @Nullable LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) { if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) { return; } + LakeCatalog lakeCatalog = + lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); if (lakeCatalog == null) { throw new InvalidAlterTableException( "Cannot alter schema for datalake enabled table " @@ -399,8 +398,6 @@ public void alterTableProperties( List tableChanges, TablePropertyChanges tablePropertyChanges, boolean ignoreIfNotExists, - @Nullable LakeCatalog lakeCatalog, - LakeTableTieringManager lakeTableTieringManager, LakeCatalog.Context lakeCatalogContext) { try { // it throws TableNotExistException if the table or database not exists @@ -431,22 +428,12 @@ public void alterTableProperties( tableDescriptor, newDescriptor, tableChanges, - lakeCatalog, lakeCatalogContext); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( newDescriptor.getProperties(), newDescriptor.getCustomProperties()); zookeeperClient.updateTable(tablePath, updatedTableRegistration); - - // post alter table properties, e.g. add the table to lake table tiering manager if - // it's to enable datalake for the table - postAlterTableProperties( - tablePath, - schemaInfo, - tableDescriptor, - updatedTableRegistration, - lakeTableTieringManager); } else { LOG.info( "No properties changed when alter table {}, skip update table.", tablePath); @@ -471,8 +458,10 @@ private void preAlterTableProperties( TableDescriptor tableDescriptor, TableDescriptor newDescriptor, List tableChanges, - LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) { + LakeCatalog lakeCatalog = + lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); + if (isDataLakeEnabled(newDescriptor)) { if (lakeCatalog == null) { throw new InvalidAlterTableException( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java new file mode 100644 index 0000000000..bf8c610a8c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.data.TableRegistration; + +import java.util.Objects; + +/** An event for table registration change. */ +public class TableRegistrationChangeEvent implements CoordinatorEvent { + private final TablePath tablePath; + private final TableRegistration tableRegistration; + + public TableRegistrationChangeEvent(TablePath tablePath, TableRegistration tableRegistration) { + this.tablePath = tablePath; + this.tableRegistration = tableRegistration; + } + + public TablePath getTablePath() { + return tablePath; + } + + public TableRegistration getTableRegistration() { + return tableRegistration; + } + + @Override + public String toString() { + return "TablePropertiesChangeEvent{" + + "tablePath=" + + tablePath + + ", tableRegistration=" + + tableRegistration + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TableRegistrationChangeEvent that = (TableRegistrationChangeEvent) o; + return Objects.equals(tablePath, that.tablePath) + && Objects.equals(tableRegistration, that.tableRegistration); + } + + @Override + public int hashCode() { + return Objects.hash(tablePath, tableRegistration); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index 09c273e7d0..5b4821f194 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.EventManager; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; @@ -127,7 +128,16 @@ public void event(Type type, ChildData oldData, ChildData newData) { if (tablePath == null) { break; } - processCreateTable(tablePath, newData); + // Distinguish between table creation and properties change. + // If oldData exists and contains valid table registration data, + // it's a properties change; otherwise, it's a table creation. + if (oldData != null + && oldData.getData() != null + && oldData.getData().length > 0) { + processTableRegistrationChange(tablePath, newData); + } else { + processCreateTable(tablePath, newData); + } } break; } @@ -241,6 +251,11 @@ private void processCreatePartition( new CreatePartitionEvent( tablePath, tableId, partitionId, partitionName, partitionAssignment)); } + + private void processTableRegistrationChange(TablePath tablePath, ChildData newData) { + TableRegistration newTable = TableZNode.decode(newData.getData()); + eventManager.put(new TableRegistrationChangeEvent(tablePath, newTable)); + } } private void processSchemaChange(TablePath tablePath, int schemaId) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index a4d7668f9d..3f333dc10f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -529,8 +529,6 @@ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) { public void updateLakeTableSnapshotId(long snapshotId) { if (snapshotId > this.lakeTableSnapshotId) { this.lakeTableSnapshotId = snapshotId; - // it means the data lake is enabled if we have got the snapshot id - this.isDataLakeEnabled = true; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e0f3d3933d..50fea1275b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -560,6 +560,32 @@ public void updateLeaderEndOffsetSnapshot() { logTablet.updateLeaderEndOffsetSnapshot(); } + public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) { + boolean old = logTablet.isDataLakeEnabled(); + if (old == isDataLakeEnabled) { + return; + } + + logTablet.updateIsDataLakeEnabled(isDataLakeEnabled); + + if (isLeader()) { + if (isDataLakeEnabled) { + registerLakeTieringMetrics(); + } else { + if (lakeTieringMetricGroup != null) { + lakeTieringMetricGroup.close(); + lakeTieringMetricGroup = null; + } + } + } + + LOG.info( + "Replica for {} isDataLakeEnabled changed from {} to {}", + tableBucket, + old, + isDataLakeEnabled); + } + private void createKv() { try { // create a closeable registry for the closable related to kv diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 421db4a068..75cdb2ace7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -84,6 +84,7 @@ import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.log.remote.RemoteLogManager; import org.apache.fluss.server.metadata.ClusterMetadata; +import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metrics.UserMetrics; import org.apache.fluss.server.metrics.group.BucketMetricGroup; @@ -459,9 +460,28 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust // check or apply coordinator epoch. validateAndApplyCoordinatorEpoch(coordinatorEpoch, "updateMetadataCache"); metadataCache.updateClusterMetadata(clusterMetadata); + updateReplicaTableConfig(clusterMetadata); }); } + private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { + for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { + TableInfo tableInfo = tableMetadata.getTableInfo(); + long tableId = tableInfo.getTableId(); + boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled(); + + for (Map.Entry entry : allReplicas.entrySet()) { + HostedReplica hostedReplica = entry.getValue(); + if (hostedReplica instanceof OnlineReplica) { + Replica replica = ((OnlineReplica) hostedReplica).getReplica(); + if (replica.getTableBucket().getTableId() == tableId) { + replica.updateIsDataLakeEnabled(dataLakeEnabled); + } + } + } + } + } + /** * Append log records to leader replicas of the buckets, and wait for them to be replicated to * other replicas. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index dfb4088e49..d4ec8f8db2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -53,6 +53,7 @@ import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; import org.apache.fluss.server.metadata.BucketMetadata; @@ -906,6 +907,70 @@ void testSchemaChange() throws Exception { 3, new TableMetadata(tableInfo2, Collections.emptyList()))); } + @Test + void testTableRegistrationChange() throws Exception { + // make sure all request to gateway should be successful + initCoordinatorChannel(); + + // create a table + TablePath t1 = TablePath.of(defaultDatabase, "test_table_registration_change"); + int nBuckets = 1; + int replicationFactor = 3; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + // create table + List replicas = tableAssignment.getBucketAssignment(0).getReplicas(); + metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + TableInfo tableInfo = metadataManager.getTable(t1); + + retry( + Duration.ofMinutes(1), + () -> + verifyMetadataUpdateRequest( + 3, + new TableMetadata( + tableInfo, + Collections.singletonList( + new BucketMetadata( + 0, replicas.get(0), 0, replicas))))); + + // alter table properties (custom property) + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setCustomProperty("custom.key", "custom.value"); + TablePropertyChanges tablePropertyChanges = builder.build(); + metadataManager.alterTableProperties( + t1, Collections.emptyList(), tablePropertyChanges, false, null); + + // get updated table info and verify metadata update request is sent + TableInfo updatedTableInfo = metadataManager.getTable(t1); + assertThat(updatedTableInfo.getCustomProperties().toMap()) + .containsEntry("custom.key", "custom.value"); + + retry( + Duration.ofMinutes(1), + () -> + verifyMetadataUpdateRequest( + 3, new TableMetadata(updatedTableInfo, Collections.emptyList()))); + + // verify the table info in coordinator context is updated + retryVerifyContext( + ctx -> { + Long tableId = ctx.getTableIdByPath(t1); + assertThat(tableId).isNotNull(); + TableInfo tableInfoInCtx = ctx.getTableInfoById(tableId); + assertThat(tableInfoInCtx).isNotNull(); + assertThat(tableInfoInCtx.getCustomProperties().toMap()) + .containsEntry("custom.key", "custom.value"); + }); + } + @Test void testDoBucketReassignment() throws Exception { zookeeperClient.registerTabletServer( @@ -1305,7 +1370,7 @@ private long createTable(TablePath tablePath, TabletServerInfo[] servers) { } private void alterTable(TablePath tablePath, List schemaChanges) { - metadataManager.alterTableSchema(tablePath, schemaChanges, true, null, null); + metadataManager.alterTableSchema(tablePath, schemaChanges, true, null); } private TableDescriptor getPartitionedTable() { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index f21d5bdd86..155b2c4fe6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -23,7 +23,6 @@ import org.apache.fluss.config.AutoPartitionTimeUnit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -51,7 +50,6 @@ import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbAddColumn; -import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbBucketMetadata; import org.apache.fluss.rpc.messages.PbPartitionMetadata; import org.apache.fluss.rpc.messages.PbServerNode; @@ -303,7 +301,8 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, resetProperties), + setProperties, + resetProperties, Collections.emptyList(), false)) .get(); @@ -679,7 +678,11 @@ void testSchemaEvolution() throws Exception { adminGateway .alterTable( newAlterTableRequest( - tablePath, Collections.emptyList(), alterTableAddColumns(), false)) + tablePath, + Collections.emptyMap(), + Collections.emptyList(), + alterTableAddColumns(), + false)) .get(); // restart coordinatorServer @@ -820,28 +823,6 @@ private static TableDescriptor newPkTable() { .build(); } - private static List alterTableProperties( - Map setProperties, List resetProperties) { - List res = new ArrayList<>(); - - for (Map.Entry entry : setProperties.entrySet()) { - PbAlterConfig info = new PbAlterConfig(); - info.setConfigKey(entry.getKey()); - info.setConfigValue(entry.getValue()); - info.setOpType(AlterConfigOpType.SET.value()); - res.add(info); - } - - for (String resetProperty : resetProperties) { - PbAlterConfig info = new PbAlterConfig(); - info.setConfigKey(resetProperty); - info.setOpType(AlterConfigOpType.DELETE.value()); - res.add(info); - } - - return res; - } - private static List alterTableAddColumns() { List addColumns = new ArrayList<>(); PbAddColumn newNestedColumn = new PbAddColumn(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 691f0d9ce3..76634b970c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -35,12 +35,15 @@ import org.apache.fluss.server.coordinator.event.DropPartitionEvent; import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.TestingEventManager; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; @@ -309,7 +312,6 @@ void testSchemaChanges() { null, TableChange.ColumnPosition.last())), false, - null, null); Schema newSchema = Schema.newBuilder() @@ -329,4 +331,64 @@ void testSchemaChanges() { assertThat(eventManager.getEvents()) .containsExactlyInAnyOrderElementsOf(allEvents)); } + + @Test + void testTableRegistrationChange() { + // create a table + TablePath tablePath = TablePath.of(DEFAULT_DB, "table_registration_change"); + TableAssignment tableAssignment = + generateAssignment( + 3, + 3, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + long tableId = metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); + long currentMillis = System.currentTimeMillis(); + + List expectedEvents = new ArrayList<>(); + expectedEvents.add( + new CreateTableEvent( + TableInfo.of( + tablePath, + tableId, + schemaInfo.getSchemaId(), + TEST_TABLE, + currentMillis, + currentMillis), + tableAssignment)); + expectedEvents.add(new SchemaChangeEvent(tablePath, schemaInfo)); + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + + // alter table properties (custom property) + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setCustomProperty("custom.key", "custom.value"); + TablePropertyChanges tablePropertyChanges = builder.build(); + metadataManager.alterTableProperties( + tablePath, Collections.emptyList(), tablePropertyChanges, false, null); + + // get the updated table registration + TableRegistration updatedTableRegistration = + metadataManager.getTableRegistration(tablePath); + + // verify TableRegistrationChangeEvent is generated + expectedEvents.add(new TableRegistrationChangeEvent(tablePath, updatedTableRegistration)); + + metadataManager.dropTable(tablePath, false); + expectedEvents.add(new DropTableEvent(tableId, false, false)); + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java index 3187e5db58..274fa1a6c8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java @@ -22,17 +22,23 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.log.FetchParams; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.tablet.TabletServer; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.FlussPaths; +import org.apache.fluss.utils.clock.ManualClock; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -45,10 +51,12 @@ import java.util.concurrent.CompletableFuture; import static org.apache.fluss.record.TestData.DATA1; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; @@ -58,11 +66,15 @@ /** ITCase for remote log. */ public class RemoteLogITCase { + + private static final ManualClock MANUAL_CLOCK = new ManualClock(System.currentTimeMillis()); + @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder() .setNumOfTabletServers(3) .setClusterConf(initConfig()) + .setClock(MANUAL_CLOCK) .build(); private TableBucket setupTableBucket() throws Exception { @@ -202,6 +214,233 @@ void testFollowerFetchAlreadyMoveToRemoteLog(boolean withWriterId) throws Except FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower); } + @Test + void testRemoteLogTTLWithLakeDisabled() throws Exception { + // Create table without data lake enabled + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + // Set a short TTL for testing (2 hours) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(2)) + .build(); + + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, tableDescriptor); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); + + // Produce records to create remote log segments + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); + + // Verify remote log segments exist + TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); + RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); + RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); + + int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + assertThat(initialSegmentCount).isGreaterThan(0); + long initialRemoteLogStartOffset = remoteLogTablet.getRemoteLogStartOffset(); + assertThat(initialRemoteLogStartOffset).isEqualTo(0L); + + // Verify data lake is disabled + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + LogTablet logTablet = leaderReplica.getLogTablet(); + assertThat(logTablet.isDataLakeEnabled()).isFalse(); + + // Advance time past TTL (2 hours + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(2).plusMinutes(30)); + + // Wait for remote log segments to be cleaned up + // Since lake is disabled, expired segments should be deleted directly + retry( + Duration.ofMinutes(2), + () -> { + // Remote log segments should be deleted after TTL + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + // Remote log start offset should be reset + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + } + + @Test + void testRemoteLogTTLWithLakeEnabled() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_remote_log_ttl_lake"); + // Create table with data lake enabled + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + // Set a short TTL for testing (2 hours) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(2)) + // Enable data lake + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); + + // Produce records to create remote log segments + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); + + // Verify remote log segments exist + TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); + RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); + RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); + + int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + assertThat(initialSegmentCount).isGreaterThan(0); + + // Verify data lake is enabled + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + LogTablet logTablet = leaderReplica.getLogTablet(); + assertThat(logTablet.isDataLakeEnabled()).isTrue(); + + // Record the remote log end offset before advancing time + long remoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); + assertThat(remoteLogEndOffset).isGreaterThan(0L); + + // Advance time past TTL (2 hours + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(2).plusMinutes(30)); + + // Since lake is enabled but no data has been tiered to lake (lakeLogEndOffset = -1), + // the expired segments should NOT be deleted + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(initialSegmentCount); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L); + + // Now simulate lake tiering by updating the lake log end offset + // This simulates that some segments have been tiered to lake + // Use a value that will cover at least one segment but not all + // Segments are deleted when segment.remoteLogEndOffset() <= lakeLogEndOffset + long partialLakeOffset = remoteLogEndOffset / 2; + // Ensure we have at least some offset to tier + if (partialLakeOffset < 10) { + partialLakeOffset = 10; + } + logTablet.updateLakeLogEndOffset(partialLakeOffset); + + final long expectedMinStartOffset = partialLakeOffset; + + // Wait for partial cleanup - only segments that have been tiered should be deleted + retry( + Duration.ofMinutes(2), + () -> { + // Some segments should be deleted (those that have been tiered) + int currentSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + assertThat(currentSegmentCount).isLessThan(initialSegmentCount); + // Remote log start offset should be updated to at least the tiered offset + assertThat(remoteLogTablet.getRemoteLogStartOffset()) + .isGreaterThanOrEqualTo(expectedMinStartOffset); + // Remaining segments should have remoteLogEndOffset > partialLakeOffset + assertThat(remoteLogTablet.allRemoteLogSegments()) + .allSatisfy( + segment -> + assertThat(segment.remoteLogEndOffset()) + .isGreaterThan(expectedMinStartOffset)); + }); + + // Now update lake log end offset to include all segments + logTablet.updateLakeLogEndOffset(remoteLogEndOffset); + + // Wait for all segments to be cleaned up + retry( + Duration.ofMinutes(2), + () -> { + // All segments should now be deleted + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + } + + @Test + void testDynamicLakeEnableAffectsTTL() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_dynamic_lake_ttl"); + // Create table without data lake enabled initially + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + // Set a short TTL for testing (1 hour) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(1)) + .build(); + + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); + + // Produce records to create remote log segments + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); + + TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); + RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); + RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); + + int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + assertThat(initialSegmentCount).isGreaterThan(0); + + // Record the remote log end offset + long remoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); + assertThat(remoteLogEndOffset).isGreaterThan(0L); + + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + LogTablet logTablet = leaderReplica.getLogTablet(); + assertThat(logTablet.isDataLakeEnabled()).isFalse(); + + // Dynamically enable data lake using admin API + CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + coordinatorGateway + .alterTable( + newAlterTableRequest( + tablePath, + Collections.singletonMap( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"), + Collections.emptyList(), + Collections.emptyList(), + false)) + .get(); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isTrue()); + + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); + + // Since lake is now enabled but no data has been tiered (lakeLogEndOffset = -1), + // the expired segments should NOT be deleted + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(initialSegmentCount); + + // Now disable data lake using admin API + coordinatorGateway + .alterTable( + newAlterTableRequest( + tablePath, + Collections.singletonMap( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"), + Collections.emptyList(), + Collections.emptyList(), + false)) + .get(); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isFalse()); + + // Wait for cleanup - now segments should be deleted since lake is disabled + retry( + Duration.ofMinutes(2), + () -> { + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_BUCKET_NUMBER, 1); @@ -213,6 +452,8 @@ private static Configuration initConfig() { // set a shorter max log time to allow replica shrink from isr. Don't be too low, otherwise // normal follower synchronization will also be affected conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(5)); + + conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON); return conf; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index c8e26ea7b5..fd487a0bc1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -752,6 +752,28 @@ void testRestore(@TempDir Path snapshotKvTabletDirPath) throws Exception { verifyGetKeyValues(kvTablet, expectedKeyValues); } + @Test + void testUpdateIsDataLakeEnabled() throws Exception { + Replica logReplica = + makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new TableBucket(DATA1_TABLE_ID, 1)); + makeLogReplicaAsLeader(logReplica); + + // initial state should be false + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse(); + + // update to true + logReplica.updateIsDataLakeEnabled(true); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue(); + + // update with same value should not change anything (no-op) + logReplica.updateIsDataLakeEnabled(true); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue(); + + // update to false + logReplica.updateIsDataLakeEnabled(false); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse(); + } + private void makeLogReplicaAsLeader(Replica replica) throws Exception { makeLeaderReplica( replica, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index b567eff1f7..5055105e95 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.testutils; +import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableDescriptor; @@ -75,6 +76,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; @@ -147,9 +149,25 @@ public static CreateTableRequest newCreateTableRequest( public static AlterTableRequest newAlterTableRequest( TablePath tablePath, - List alterConfigs, + Map setProperties, + List resetProperties, List addColumns, boolean ignoreIfExists) { + List alterConfigs = new ArrayList<>(); + for (Map.Entry entry : setProperties.entrySet()) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(entry.getKey()); + info.setConfigValue(entry.getValue()); + info.setOpType(AlterConfigOpType.SET.value()); + alterConfigs.add(info); + } + for (String resetProperty : resetProperties) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(resetProperty); + info.setOpType(AlterConfigOpType.DELETE.value()); + alterConfigs.add(info); + } + AlterTableRequest request = new AlterTableRequest(); request.addAllConfigChanges(alterConfigs) .addAllAddColumns(addColumns) From 2c759d342ba063e38fd5fa608c54ab2c2361c609 Mon Sep 17 00:00:00 2001 From: Liebing Date: Wed, 21 Jan 2026 16:25:06 +0800 Subject: [PATCH 2/2] rebase and address yuxia's comments --- .../CoordinatorEventProcessor.java | 17 ++ .../coordinator/CoordinatorRequestBatch.java | 3 +- .../server/coordinator/MetadataManager.java | 42 ---- .../fluss/server/replica/ReplicaManager.java | 20 +- .../coordinator/LakeTableManagerITCase.java | 12 +- .../server/log/remote/RemoteLogITCase.java | 226 +++++++----------- 6 files changed, 121 insertions(+), 199 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 7a86e3f764..d0dd0de213 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -126,6 +126,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -730,6 +731,10 @@ private void processTableRegistrationChange(TableRegistrationChangeEvent event) // Skip if the table is not yet registered in coordinator context. // Should not happen in normal cases. if (tableId == null) { + LOG.warn( + "Table {} is not registered in coordinator context, " + + "skip processing table registration change.", + tablePath); return; } @@ -749,6 +754,7 @@ private void processTableRegistrationChange(TableRegistrationChangeEvent event) } private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTableInfo) { + boolean dataLakeEnabled = newTableInfo.getTableConfig().isDataLakeEnabled(); boolean toEnableDataLake = !oldTableInfo.getTableConfig().isDataLakeEnabled() && newTableInfo.getTableConfig().isDataLakeEnabled(); @@ -761,7 +767,18 @@ private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTable lakeTableTieringManager.addNewLakeTable(newTableInfo); } else if (toDisableDataLake) { lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId()); + } else if (dataLakeEnabled) { + // The table is still a lake table, check if freshness has changed + Duration oldFreshness = oldTableInfo.getTableConfig().getDataLakeFreshness(); + Duration newFreshness = newTableInfo.getTableConfig().getDataLakeFreshness(); + + // Check if freshness has changed + if (!Objects.equals(oldFreshness, newFreshness)) { + lakeTableTieringManager.updateTableLakeFreshness( + newTableInfo.getTableId(), newFreshness.toMillis()); + } } + // more post-alter actions can be added here } private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 35527805c0..436cc52038 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -280,6 +280,7 @@ public void addStopReplicaRequestForTabletServers( *
  • case8: One newly tabletServer added into cluster *
  • case9: One tabletServer is removed from cluster *
  • case10: schemaId is changed after table is created. + *
  • case 11: TableRegistration changed after table is created. * */ // todo: improve this with different phase enum. @@ -300,7 +301,7 @@ public void addUpdateMetadataRequestForTabletServers( .computeIfAbsent(tableId, k -> new HashMap<>()) .put(partitionId, Collections.emptyList()); } else { - // case3, case4, case10 + // case3, case4, case10, case 11 updateMetadataRequestBucketMap.put(tableId, Collections.emptyList()); } } else { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 9f23979701..8d7b61a088 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -60,14 +60,12 @@ import javax.annotation.Nullable; -import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -500,41 +498,6 @@ private void preAlterTableProperties( } } - private void postAlterTableProperties( - TablePath tablePath, - SchemaInfo schemaInfo, - TableDescriptor oldTableDescriptor, - TableRegistration newTableRegistration, - LakeTableTieringManager lakeTableTieringManager) { - - boolean dataLakeEnabled = isDataLakeEnabled(newTableRegistration.properties); - boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && dataLakeEnabled; - boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && !dataLakeEnabled; - - if (toEnableDataLake) { - TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); - // if the table is lake table, we need to add it to lake table tiering manager - lakeTableTieringManager.addNewLakeTable(newTableInfo); - } else if (toDisableDataLake) { - lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); - } else if (dataLakeEnabled) { - // The table is still a lake table, check if freshness has changed - Duration oldFreshness = - Configuration.fromMap(oldTableDescriptor.getProperties()) - .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); - Duration newFreshness = - Configuration.fromMap(newTableRegistration.properties) - .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); - - // Check if freshness has changed - if (!Objects.equals(oldFreshness, newFreshness)) { - lakeTableTieringManager.updateTableLakeFreshness( - newTableRegistration.tableId, newFreshness.toMillis()); - } - } - // more post-alter actions can be added here - } - /** * Get a new TableDescriptor with updated properties. * @@ -576,11 +539,6 @@ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) { return Boolean.parseBoolean(dataLakeEnabledValue); } - private boolean isDataLakeEnabled(Map properties) { - String dataLakeEnabledValue = properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); - return Boolean.parseBoolean(dataLakeEnabledValue); - } - public void removeSensitiveTableOptions(Map tableLakeOptions) { if (tableLakeOptions == null || tableLakeOptions.isEmpty()) { return; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 75cdb2ace7..b5a5ecc54b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -467,15 +467,17 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { TableInfo tableInfo = tableMetadata.getTableInfo(); - long tableId = tableInfo.getTableId(); - boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled(); - - for (Map.Entry entry : allReplicas.entrySet()) { - HostedReplica hostedReplica = entry.getValue(); - if (hostedReplica instanceof OnlineReplica) { - Replica replica = ((OnlineReplica) hostedReplica).getReplica(); - if (replica.getTableBucket().getTableId() == tableId) { - replica.updateIsDataLakeEnabled(dataLakeEnabled); + if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) { + long tableId = tableInfo.getTableId(); + boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled(); + + for (Map.Entry entry : allReplicas.entrySet()) { + HostedReplica hostedReplica = entry.getValue(); + if (hostedReplica instanceof OnlineReplica) { + Replica replica = ((OnlineReplica) hostedReplica).getReplica(); + if (replica.getTableBucket().getTableId() == tableId) { + replica.updateIsDataLakeEnabled(dataLakeEnabled); + } } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index 2472d5041f..dd25e0ef14 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -156,7 +156,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, new ArrayList<>()), + setProperties, + Collections.emptyList(), Collections.emptyList(), false)) .get(); @@ -185,7 +186,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(new HashMap<>(), resetProperties), + Collections.emptyMap(), + resetProperties, Collections.emptyList(), false)) .get(); @@ -214,7 +216,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(new HashMap<>(), resetProperties2), + Collections.emptyMap(), + resetProperties2, Collections.emptyList(), false)) .get(); @@ -280,7 +283,8 @@ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, new ArrayList<>()), + setProperties, + Collections.emptyList(), Collections.emptyList(), false)) .get(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java index 274fa1a6c8..90dfa0914e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; @@ -47,8 +48,11 @@ import java.time.Duration; import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; @@ -87,6 +91,11 @@ private TableBucket setupTableBucket() throws Exception { private void produceRecordsAndWaitRemoteLogCopy( TabletServerGateway leaderGateway, TableBucket tb) throws Exception { + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L); + } + + private void produceRecordsAndWaitRemoteLogCopy( + TabletServerGateway leaderGateway, TableBucket tb, long baseOffset) throws Exception { for (int i = 0; i < 10; i++) { assertProduceLogResponse( leaderGateway @@ -98,7 +107,7 @@ private void produceRecordsAndWaitRemoteLogCopy( genMemoryLogRecordsByObject(DATA1))) .get(), 0, - i * 10L); + baseOffset + i * 10L); } FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote( new TableBucket(tb.getTableId(), 0)); @@ -215,17 +224,19 @@ void testFollowerFetchAlreadyMoveToRemoteLog(boolean withWriterId) throws Except } @Test - void testRemoteLogTTLWithLakeDisabled() throws Exception { + void testRemoteLogTTLWithDynamicLakeToggle() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_remote_log_ttl_dynamic_lake"); + // ==================== Stage A: Lake Disabled (Initial) ==================== // Create table without data lake enabled TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA) .distributedBy(1) - // Set a short TTL for testing (2 hours) - .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(2)) + // Set a short TTL for testing (1 hour) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(1)) .build(); - long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, tableDescriptor); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); @@ -236,203 +247,132 @@ void testRemoteLogTTLWithLakeDisabled() throws Exception { // Produce records to create remote log segments produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); - // Verify remote log segments exist TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); - int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); - assertThat(initialSegmentCount).isGreaterThan(0); - long initialRemoteLogStartOffset = remoteLogTablet.getRemoteLogStartOffset(); - assertThat(initialRemoteLogStartOffset).isEqualTo(0L); + assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L); - // Verify data lake is disabled Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); LogTablet logTablet = leaderReplica.getLogTablet(); assertThat(logTablet.isDataLakeEnabled()).isFalse(); - // Advance time past TTL (2 hours + buffer) - MANUAL_CLOCK.advanceTime(Duration.ofHours(2).plusMinutes(30)); + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); - // Wait for remote log segments to be cleaned up // Since lake is disabled, expired segments should be deleted directly retry( Duration.ofMinutes(2), () -> { - // Remote log segments should be deleted after TTL assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); - // Remote log start offset should be reset assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); }); - } - - @Test - void testRemoteLogTTLWithLakeEnabled() throws Exception { - TablePath tablePath = TablePath.of("fluss", "test_remote_log_ttl_lake"); - // Create table with data lake enabled - TableDescriptor tableDescriptor = - TableDescriptor.builder() - .schema(DATA1_SCHEMA) - .distributedBy(1) - // Set a short TTL for testing (2 hours) - .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(2)) - // Enable data lake - .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) - .build(); - - long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); - TableBucket tb = new TableBucket(tableId, 0); - FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); - - int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); - TabletServerGateway leaderGateway = - FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); - - // Produce records to create remote log segments - produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); - - // Verify remote log segments exist - TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); - RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); - RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); - - int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); - assertThat(initialSegmentCount).isGreaterThan(0); - // Verify data lake is enabled - Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); - LogTablet logTablet = leaderReplica.getLogTablet(); - assertThat(logTablet.isDataLakeEnabled()).isTrue(); + // ==================== Stage B: Dynamic Enable & Retention ==================== + // Dynamically enable data lake + CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + coordinatorGateway + .alterTable( + newAlterTableRequest( + tablePath, + Collections.singletonMap( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"), + Collections.emptyList(), + Collections.emptyList(), + false)) + .get(); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isTrue()); - // Record the remote log end offset before advancing time - long remoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); - assertThat(remoteLogEndOffset).isGreaterThan(0L); + // Produce new data after enabling lake (baseOffset = 100 from Stage A) + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 100L); + retry( + Duration.ofMinutes(2), + () -> assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0)); + int stageBSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + long stageBRemoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); + assertThat(stageBRemoteLogEndOffset).isGreaterThan(0L); - // Advance time past TTL (2 hours + buffer) - MANUAL_CLOCK.advanceTime(Duration.ofHours(2).plusMinutes(30)); + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); - // Since lake is enabled but no data has been tiered to lake (lakeLogEndOffset = -1), + // Since lake is enabled but no data has been tiered (lakeLogEndOffset = -1), // the expired segments should NOT be deleted - assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(initialSegmentCount); - assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L); - - // Now simulate lake tiering by updating the lake log end offset - // This simulates that some segments have been tiered to lake - // Use a value that will cover at least one segment but not all - // Segments are deleted when segment.remoteLogEndOffset() <= lakeLogEndOffset - long partialLakeOffset = remoteLogEndOffset / 2; - // Ensure we have at least some offset to tier - if (partialLakeOffset < 10) { - partialLakeOffset = 10; - } + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(stageBSegmentCount); + + // ==================== Stage C: Lake Progress Update (Cleanup) ==================== + // Step 1: Partially update lake log end offset to trigger partial cleanup + // Get segments sorted by remoteLogEndOffset and use middle segment's end offset + List sortedSegments = + remoteLogTablet.allRemoteLogSegments().stream() + .sorted(Comparator.comparingLong(RemoteLogSegment::remoteLogEndOffset)) + .collect(Collectors.toList()); + assertThat(sortedSegments.size()).isGreaterThanOrEqualTo(2); + + // Use the end offset of a middle segment to ensure partial deletion + int midIndex = sortedSegments.size() / 2; + long partialLakeOffset = sortedSegments.get(midIndex).remoteLogEndOffset(); logTablet.updateLakeLogEndOffset(partialLakeOffset); - final long expectedMinStartOffset = partialLakeOffset; + final int expectedRemainingSegments = sortedSegments.size() - midIndex - 1; + // The new remoteLogStartOffset should be the start offset of the first remaining segment + final long expectedNewStartOffset = sortedSegments.get(midIndex + 1).remoteLogStartOffset(); // Wait for partial cleanup - only segments that have been tiered should be deleted retry( Duration.ofMinutes(2), () -> { - // Some segments should be deleted (those that have been tiered) + // Some segments should be deleted (those with endOffset <= partialLakeOffset) int currentSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); - assertThat(currentSegmentCount).isLessThan(initialSegmentCount); - // Remote log start offset should be updated to at least the tiered offset + assertThat(currentSegmentCount).isEqualTo(expectedRemainingSegments); + // Remote log start offset should be updated to the first remaining segment's + // start assertThat(remoteLogTablet.getRemoteLogStartOffset()) - .isGreaterThanOrEqualTo(expectedMinStartOffset); + .isEqualTo(expectedNewStartOffset); // Remaining segments should have remoteLogEndOffset > partialLakeOffset assertThat(remoteLogTablet.allRemoteLogSegments()) .allSatisfy( segment -> assertThat(segment.remoteLogEndOffset()) - .isGreaterThan(expectedMinStartOffset)); + .isGreaterThan(partialLakeOffset)); }); - // Now update lake log end offset to include all segments - logTablet.updateLakeLogEndOffset(remoteLogEndOffset); + // Step 2: Fully update lake log end offset to trigger complete cleanup + logTablet.updateLakeLogEndOffset(stageBRemoteLogEndOffset); - // Wait for all segments to be cleaned up + // Wait for complete cleanup - all segments should be deleted retry( Duration.ofMinutes(2), () -> { - // All segments should now be deleted assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); }); - } - - @Test - void testDynamicLakeEnableAffectsTTL() throws Exception { - TablePath tablePath = TablePath.of("fluss", "test_dynamic_lake_ttl"); - // Create table without data lake enabled initially - TableDescriptor tableDescriptor = - TableDescriptor.builder() - .schema(DATA1_SCHEMA) - .distributedBy(1) - // Set a short TTL for testing (1 hour) - .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(1)) - .build(); - - long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); - TableBucket tb = new TableBucket(tableId, 0); - FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); - - int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); - TabletServerGateway leaderGateway = - FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); - - // Produce records to create remote log segments - produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); - - TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); - RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); - RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); - - int initialSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); - assertThat(initialSegmentCount).isGreaterThan(0); - // Record the remote log end offset - long remoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); - assertThat(remoteLogEndOffset).isGreaterThan(0L); - - Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); - LogTablet logTablet = leaderReplica.getLogTablet(); - assertThat(logTablet.isDataLakeEnabled()).isFalse(); - - // Dynamically enable data lake using admin API - CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + // ==================== Stage D: Dynamic Disable ==================== + // Dynamically disable data lake coordinatorGateway .alterTable( newAlterTableRequest( tablePath, Collections.singletonMap( - ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"), + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"), Collections.emptyList(), Collections.emptyList(), false)) .get(); - retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isTrue()); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isFalse()); + + // Produce new data after disabling lake (baseOffset = 200 from Stage A + B) + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 200L); + retry( + Duration.ofMinutes(2), + () -> assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0)); // Advance time past TTL (1 hour + buffer) MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); - // Since lake is now enabled but no data has been tiered (lakeLogEndOffset = -1), - // the expired segments should NOT be deleted - assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(initialSegmentCount); - - // Now disable data lake using admin API - coordinatorGateway - .alterTable( - newAlterTableRequest( - tablePath, - Collections.singletonMap( - ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"), - Collections.emptyList(), - Collections.emptyList(), - false)) - .get(); - retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isFalse()); - - // Wait for cleanup - now segments should be deleted since lake is disabled + // Since lake is disabled, expired segments should be deleted directly, + // ignoring the lake offset status retry( Duration.ofMinutes(2), () -> {