diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index eba41e0788..be894ee790 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -340,23 +340,6 @@ public static TableInfo of( modifiedTime); } - /** Replace a TableInfo with a new SchemaInfo. */ - public TableInfo withNewSchema(SchemaInfo schemaInfo) { - return new TableInfo( - tablePath, - tableId, - schemaInfo.getSchemaId(), - schemaInfo.getSchema(), - bucketKeys, - partitionKeys, - numBuckets, - properties, - customProperties, - comment, - createdTime, - modifiedTime); - } - @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 6fb7003223..cf6def30fd 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -27,9 +27,11 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; @@ -59,6 +61,7 @@ import javax.annotation.Nullable; import java.nio.file.Files; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -72,6 +75,7 @@ import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -584,6 +588,7 @@ void testAlterLakeEnabledLogTable() throws Exception { .build(); TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter"); admin.createTable(logTablePath, logTable, false).get(); + long tableId = admin.getTableInfo(logTablePath).get().getTableId(); assertThatThrownBy( () -> @@ -591,6 +596,9 @@ void testAlterLakeEnabledLogTable() throws Exception { Identifier.create(DATABASE, logTablePath.getTableName()))) .isInstanceOf(Catalog.TableNotExistException.class); + // verify LogTablet datalake status is initially disabled + verifyLogTabletDataLakeEnabled(tableId, false); + // enable lake TableChange.SetOption enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); @@ -598,6 +606,9 @@ void testAlterLakeEnabledLogTable() throws Exception { admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is enabled + verifyLogTabletDataLakeEnabled(tableId, true); + Identifier paimonTablePath = Identifier.create(DATABASE, logTablePath.getTableName()); Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath); @@ -635,11 +646,17 @@ void testAlterLakeEnabledLogTable() throws Exception { // paimon table should still exist although lake is disabled paimonCatalog.getTable(paimonTablePath); + // verify LogTablet datalake status is disabled + verifyLogTabletDataLakeEnabled(tableId, false); + // try to enable lake table again enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); changes = Collections.singletonList(enableLake); admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is enabled again + verifyLogTabletDataLakeEnabled(tableId, true); + // write some data to the lake table writeData(paimonCatalog.getTable(paimonTablePath)); Optional snapshot = paimonCatalog.getTable(paimonTablePath).latestSnapshot(); @@ -649,10 +666,16 @@ void testAlterLakeEnabledLogTable() throws Exception { changes = Collections.singletonList(disableLake); admin.alterTable(logTablePath, changes, false).get(); + // verify LogTablet datalake status is disabled again + verifyLogTabletDataLakeEnabled(tableId, false); + // try to enable lake table again, the snapshot should not change changes = Collections.singletonList(enableLake); admin.alterTable(logTablePath, changes, false).get(); assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot); + + // verify LogTablet datalake status is enabled + verifyLogTabletDataLakeEnabled(tableId, true); } @Test @@ -1021,6 +1044,19 @@ private void verifyPaimonTable( assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment()); } + private void verifyLogTabletDataLakeEnabled(long tableId, boolean isDataLakeEnabled) { + for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + retry( + Duration.ofMinutes(1), + () -> { + Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + assertThat(replica.getLogTablet().isDataLakeEnabled()) + .isEqualTo(isDataLakeEnabled); + }); + } + } + private TableDescriptor createTableDescriptor( int columnNum, int bucketNum, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 55b59753c7..d0dd0de213 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -85,6 +85,7 @@ import org.apache.fluss.server.coordinator.event.RebalanceEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.rebalance.RebalanceManager; @@ -125,6 +126,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -560,6 +562,8 @@ public void process(CoordinatorEvent event) { } else if (event instanceof SchemaChangeEvent) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; processSchemaChange(schemaChangeEvent); + } else if (event instanceof TableRegistrationChangeEvent) { + processTableRegistrationChange((TableRegistrationChangeEvent) event); } else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) { processNotifyLeaderAndIsrResponseReceivedEvent( (NotifyLeaderAndIsrResponseReceivedEvent) event); @@ -720,6 +724,63 @@ private void processSchemaChange(SchemaChangeEvent schemaChangeEvent) { null); } + private void processTableRegistrationChange(TableRegistrationChangeEvent event) { + TablePath tablePath = event.getTablePath(); + Long tableId = coordinatorContext.getTableIdByPath(tablePath); + + // Skip if the table is not yet registered in coordinator context. + // Should not happen in normal cases. + if (tableId == null) { + LOG.warn( + "Table {} is not registered in coordinator context, " + + "skip processing table registration change.", + tablePath); + return; + } + + TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId); + + TableInfo newTableInfo = + event.getTableRegistration().toTableInfo(tablePath, oldTableInfo.getSchemaInfo()); + coordinatorContext.putTableInfo(newTableInfo); + postAlterTableProperties(oldTableInfo, newTableInfo); + + // Notify tablet servers about the metadata change + updateTabletServerMetadataCache( + new HashSet<>(coordinatorContext.getLiveTabletServers().values()), + tableId, + null, + null); + } + + private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTableInfo) { + boolean dataLakeEnabled = newTableInfo.getTableConfig().isDataLakeEnabled(); + boolean toEnableDataLake = + !oldTableInfo.getTableConfig().isDataLakeEnabled() + && newTableInfo.getTableConfig().isDataLakeEnabled(); + boolean toDisableDataLake = + oldTableInfo.getTableConfig().isDataLakeEnabled() + && !newTableInfo.getTableConfig().isDataLakeEnabled(); + + if (toEnableDataLake) { + // if the table is lake table, we need to add it to lake table tiering manager + lakeTableTieringManager.addNewLakeTable(newTableInfo); + } else if (toDisableDataLake) { + lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId()); + } else if (dataLakeEnabled) { + // The table is still a lake table, check if freshness has changed + Duration oldFreshness = oldTableInfo.getTableConfig().getDataLakeFreshness(); + Duration newFreshness = newTableInfo.getTableConfig().getDataLakeFreshness(); + + // Check if freshness has changed + if (!Objects.equals(oldFreshness, newFreshness)) { + lakeTableTieringManager.updateTableLakeFreshness( + newTableInfo.getTableId(), newFreshness.toMillis()); + } + } + // more post-alter actions can be added here + } + private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { long partitionId = createPartitionEvent.getPartitionId(); // skip the partition if it already exists diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 35527805c0..436cc52038 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -280,6 +280,7 @@ public void addStopReplicaRequestForTabletServers( *
  • case8: One newly tabletServer added into cluster *
  • case9: One tabletServer is removed from cluster *
  • case10: schemaId is changed after table is created. + *
  • case 11: TableRegistration changed after table is created. * */ // todo: improve this with different phase enum. @@ -300,7 +301,7 @@ public void addUpdateMetadataRequestForTabletServers( .computeIfAbsent(tableId, k -> new HashMap<>()) .put(partitionId, Collections.emptyList()); } else { - // case3, case4, case10 + // case3, case4, case10, case 11 updateMetadataRequestBucketMap.put(tableId, Collections.emptyList()); } } else { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 94d56dd338..a9cd5b3c5f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -378,8 +378,6 @@ public CompletableFuture alterTable(AlterTableRequest reques + "table properties or table schema."); } - LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer = - lakeCatalogDynamicLoader.getLakeCatalogContainer(); LakeCatalog.Context lakeCatalogContext = new DefaultLakeCatalogContext(false, currentSession().getPrincipal()); @@ -388,7 +386,6 @@ public CompletableFuture alterTable(AlterTableRequest reques tablePath, alterSchemaChanges, request.isIgnoreIfNotExists(), - lakeCatalogContainer.getLakeCatalog(), lakeCatalogContext); } @@ -398,8 +395,6 @@ public CompletableFuture alterTable(AlterTableRequest reques alterTableConfigChanges, tablePropertyChanges, request.isIgnoreIfNotExists(), - lakeCatalogContainer.getLakeCatalog(), - lakeTableTieringManager, lakeCatalogContext); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index f728e91da5..8d7b61a088 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -60,14 +60,12 @@ import javax.annotation.Nullable; -import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -327,7 +325,6 @@ public void alterTableSchema( TablePath tablePath, List schemaChanges, boolean ignoreIfNotExists, - @Nullable LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) throws TableNotExistException, TableNotPartitionedException { try { @@ -339,8 +336,7 @@ public void alterTableSchema( Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); // Lake First: sync to Lake before updating Fluss schema - syncSchemaChangesToLake( - tablePath, table, schemaChanges, lakeCatalog, lakeCatalogContext); + syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext); // Update Fluss schema (ZK) after Lake sync succeeds if (!newSchema.equals(table.getSchema())) { @@ -370,12 +366,13 @@ private void syncSchemaChangesToLake( TablePath tablePath, TableInfo tableInfo, List schemaChanges, - @Nullable LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) { if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) { return; } + LakeCatalog lakeCatalog = + lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); if (lakeCatalog == null) { throw new InvalidAlterTableException( "Cannot alter schema for datalake enabled table " @@ -399,8 +396,6 @@ public void alterTableProperties( List tableChanges, TablePropertyChanges tablePropertyChanges, boolean ignoreIfNotExists, - @Nullable LakeCatalog lakeCatalog, - LakeTableTieringManager lakeTableTieringManager, LakeCatalog.Context lakeCatalogContext) { try { // it throws TableNotExistException if the table or database not exists @@ -431,22 +426,12 @@ public void alterTableProperties( tableDescriptor, newDescriptor, tableChanges, - lakeCatalog, lakeCatalogContext); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( newDescriptor.getProperties(), newDescriptor.getCustomProperties()); zookeeperClient.updateTable(tablePath, updatedTableRegistration); - - // post alter table properties, e.g. add the table to lake table tiering manager if - // it's to enable datalake for the table - postAlterTableProperties( - tablePath, - schemaInfo, - tableDescriptor, - updatedTableRegistration, - lakeTableTieringManager); } else { LOG.info( "No properties changed when alter table {}, skip update table.", tablePath); @@ -471,8 +456,10 @@ private void preAlterTableProperties( TableDescriptor tableDescriptor, TableDescriptor newDescriptor, List tableChanges, - LakeCatalog lakeCatalog, LakeCatalog.Context lakeCatalogContext) { + LakeCatalog lakeCatalog = + lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); + if (isDataLakeEnabled(newDescriptor)) { if (lakeCatalog == null) { throw new InvalidAlterTableException( @@ -511,41 +498,6 @@ private void preAlterTableProperties( } } - private void postAlterTableProperties( - TablePath tablePath, - SchemaInfo schemaInfo, - TableDescriptor oldTableDescriptor, - TableRegistration newTableRegistration, - LakeTableTieringManager lakeTableTieringManager) { - - boolean dataLakeEnabled = isDataLakeEnabled(newTableRegistration.properties); - boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && dataLakeEnabled; - boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && !dataLakeEnabled; - - if (toEnableDataLake) { - TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); - // if the table is lake table, we need to add it to lake table tiering manager - lakeTableTieringManager.addNewLakeTable(newTableInfo); - } else if (toDisableDataLake) { - lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); - } else if (dataLakeEnabled) { - // The table is still a lake table, check if freshness has changed - Duration oldFreshness = - Configuration.fromMap(oldTableDescriptor.getProperties()) - .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); - Duration newFreshness = - Configuration.fromMap(newTableRegistration.properties) - .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); - - // Check if freshness has changed - if (!Objects.equals(oldFreshness, newFreshness)) { - lakeTableTieringManager.updateTableLakeFreshness( - newTableRegistration.tableId, newFreshness.toMillis()); - } - } - // more post-alter actions can be added here - } - /** * Get a new TableDescriptor with updated properties. * @@ -587,11 +539,6 @@ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) { return Boolean.parseBoolean(dataLakeEnabledValue); } - private boolean isDataLakeEnabled(Map properties) { - String dataLakeEnabledValue = properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); - return Boolean.parseBoolean(dataLakeEnabledValue); - } - public void removeSensitiveTableOptions(Map tableLakeOptions) { if (tableLakeOptions == null || tableLakeOptions.isEmpty()) { return; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java new file mode 100644 index 0000000000..bf8c610a8c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.data.TableRegistration; + +import java.util.Objects; + +/** An event for table registration change. */ +public class TableRegistrationChangeEvent implements CoordinatorEvent { + private final TablePath tablePath; + private final TableRegistration tableRegistration; + + public TableRegistrationChangeEvent(TablePath tablePath, TableRegistration tableRegistration) { + this.tablePath = tablePath; + this.tableRegistration = tableRegistration; + } + + public TablePath getTablePath() { + return tablePath; + } + + public TableRegistration getTableRegistration() { + return tableRegistration; + } + + @Override + public String toString() { + return "TablePropertiesChangeEvent{" + + "tablePath=" + + tablePath + + ", tableRegistration=" + + tableRegistration + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + TableRegistrationChangeEvent that = (TableRegistrationChangeEvent) o; + return Objects.equals(tablePath, that.tablePath) + && Objects.equals(tableRegistration, that.tableRegistration); + } + + @Override + public int hashCode() { + return Objects.hash(tablePath, tableRegistration); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index 09c273e7d0..5b4821f194 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.EventManager; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; @@ -127,7 +128,16 @@ public void event(Type type, ChildData oldData, ChildData newData) { if (tablePath == null) { break; } - processCreateTable(tablePath, newData); + // Distinguish between table creation and properties change. + // If oldData exists and contains valid table registration data, + // it's a properties change; otherwise, it's a table creation. + if (oldData != null + && oldData.getData() != null + && oldData.getData().length > 0) { + processTableRegistrationChange(tablePath, newData); + } else { + processCreateTable(tablePath, newData); + } } break; } @@ -241,6 +251,11 @@ private void processCreatePartition( new CreatePartitionEvent( tablePath, tableId, partitionId, partitionName, partitionAssignment)); } + + private void processTableRegistrationChange(TablePath tablePath, ChildData newData) { + TableRegistration newTable = TableZNode.decode(newData.getData()); + eventManager.put(new TableRegistrationChangeEvent(tablePath, newTable)); + } } private void processSchemaChange(TablePath tablePath, int schemaId) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index a4d7668f9d..3f333dc10f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -529,8 +529,6 @@ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) { public void updateLakeTableSnapshotId(long snapshotId) { if (snapshotId > this.lakeTableSnapshotId) { this.lakeTableSnapshotId = snapshotId; - // it means the data lake is enabled if we have got the snapshot id - this.isDataLakeEnabled = true; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e0f3d3933d..50fea1275b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -560,6 +560,32 @@ public void updateLeaderEndOffsetSnapshot() { logTablet.updateLeaderEndOffsetSnapshot(); } + public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) { + boolean old = logTablet.isDataLakeEnabled(); + if (old == isDataLakeEnabled) { + return; + } + + logTablet.updateIsDataLakeEnabled(isDataLakeEnabled); + + if (isLeader()) { + if (isDataLakeEnabled) { + registerLakeTieringMetrics(); + } else { + if (lakeTieringMetricGroup != null) { + lakeTieringMetricGroup.close(); + lakeTieringMetricGroup = null; + } + } + } + + LOG.info( + "Replica for {} isDataLakeEnabled changed from {} to {}", + tableBucket, + old, + isDataLakeEnabled); + } + private void createKv() { try { // create a closeable registry for the closable related to kv diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 421db4a068..b5a5ecc54b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -84,6 +84,7 @@ import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.log.remote.RemoteLogManager; import org.apache.fluss.server.metadata.ClusterMetadata; +import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metrics.UserMetrics; import org.apache.fluss.server.metrics.group.BucketMetricGroup; @@ -459,9 +460,30 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust // check or apply coordinator epoch. validateAndApplyCoordinatorEpoch(coordinatorEpoch, "updateMetadataCache"); metadataCache.updateClusterMetadata(clusterMetadata); + updateReplicaTableConfig(clusterMetadata); }); } + private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { + for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { + TableInfo tableInfo = tableMetadata.getTableInfo(); + if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) { + long tableId = tableInfo.getTableId(); + boolean dataLakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled(); + + for (Map.Entry entry : allReplicas.entrySet()) { + HostedReplica hostedReplica = entry.getValue(); + if (hostedReplica instanceof OnlineReplica) { + Replica replica = ((OnlineReplica) hostedReplica).getReplica(); + if (replica.getTableBucket().getTableId() == tableId) { + replica.updateIsDataLakeEnabled(dataLakeEnabled); + } + } + } + } + } + } + /** * Append log records to leader replicas of the buckets, and wait for them to be replicated to * other replicas. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index dfb4088e49..d4ec8f8db2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -53,6 +53,7 @@ import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.CommitRemoteLogManifestData; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; import org.apache.fluss.server.metadata.BucketMetadata; @@ -906,6 +907,70 @@ void testSchemaChange() throws Exception { 3, new TableMetadata(tableInfo2, Collections.emptyList()))); } + @Test + void testTableRegistrationChange() throws Exception { + // make sure all request to gateway should be successful + initCoordinatorChannel(); + + // create a table + TablePath t1 = TablePath.of(defaultDatabase, "test_table_registration_change"); + int nBuckets = 1; + int replicationFactor = 3; + TableAssignment tableAssignment = + generateAssignment( + nBuckets, + replicationFactor, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + // create table + List replicas = tableAssignment.getBucketAssignment(0).getReplicas(); + metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + TableInfo tableInfo = metadataManager.getTable(t1); + + retry( + Duration.ofMinutes(1), + () -> + verifyMetadataUpdateRequest( + 3, + new TableMetadata( + tableInfo, + Collections.singletonList( + new BucketMetadata( + 0, replicas.get(0), 0, replicas))))); + + // alter table properties (custom property) + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setCustomProperty("custom.key", "custom.value"); + TablePropertyChanges tablePropertyChanges = builder.build(); + metadataManager.alterTableProperties( + t1, Collections.emptyList(), tablePropertyChanges, false, null); + + // get updated table info and verify metadata update request is sent + TableInfo updatedTableInfo = metadataManager.getTable(t1); + assertThat(updatedTableInfo.getCustomProperties().toMap()) + .containsEntry("custom.key", "custom.value"); + + retry( + Duration.ofMinutes(1), + () -> + verifyMetadataUpdateRequest( + 3, new TableMetadata(updatedTableInfo, Collections.emptyList()))); + + // verify the table info in coordinator context is updated + retryVerifyContext( + ctx -> { + Long tableId = ctx.getTableIdByPath(t1); + assertThat(tableId).isNotNull(); + TableInfo tableInfoInCtx = ctx.getTableInfoById(tableId); + assertThat(tableInfoInCtx).isNotNull(); + assertThat(tableInfoInCtx.getCustomProperties().toMap()) + .containsEntry("custom.key", "custom.value"); + }); + } + @Test void testDoBucketReassignment() throws Exception { zookeeperClient.registerTabletServer( @@ -1305,7 +1370,7 @@ private long createTable(TablePath tablePath, TabletServerInfo[] servers) { } private void alterTable(TablePath tablePath, List schemaChanges) { - metadataManager.alterTableSchema(tablePath, schemaChanges, true, null, null); + metadataManager.alterTableSchema(tablePath, schemaChanges, true, null); } private TableDescriptor getPartitionedTable() { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index 2472d5041f..dd25e0ef14 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -156,7 +156,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, new ArrayList<>()), + setProperties, + Collections.emptyList(), Collections.emptyList(), false)) .get(); @@ -185,7 +186,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(new HashMap<>(), resetProperties), + Collections.emptyMap(), + resetProperties, Collections.emptyList(), false)) .get(); @@ -214,7 +216,8 @@ void testAlterAndResetTableDatalakeProperties() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(new HashMap<>(), resetProperties2), + Collections.emptyMap(), + resetProperties2, Collections.emptyList(), false)) .get(); @@ -280,7 +283,8 @@ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, new ArrayList<>()), + setProperties, + Collections.emptyList(), Collections.emptyList(), false)) .get(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index f21d5bdd86..155b2c4fe6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -23,7 +23,6 @@ import org.apache.fluss.config.AutoPartitionTimeUnit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -51,7 +50,6 @@ import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbAddColumn; -import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbBucketMetadata; import org.apache.fluss.rpc.messages.PbPartitionMetadata; import org.apache.fluss.rpc.messages.PbServerNode; @@ -303,7 +301,8 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { .alterTable( newAlterTableRequest( tablePath, - alterTableProperties(setProperties, resetProperties), + setProperties, + resetProperties, Collections.emptyList(), false)) .get(); @@ -679,7 +678,11 @@ void testSchemaEvolution() throws Exception { adminGateway .alterTable( newAlterTableRequest( - tablePath, Collections.emptyList(), alterTableAddColumns(), false)) + tablePath, + Collections.emptyMap(), + Collections.emptyList(), + alterTableAddColumns(), + false)) .get(); // restart coordinatorServer @@ -820,28 +823,6 @@ private static TableDescriptor newPkTable() { .build(); } - private static List alterTableProperties( - Map setProperties, List resetProperties) { - List res = new ArrayList<>(); - - for (Map.Entry entry : setProperties.entrySet()) { - PbAlterConfig info = new PbAlterConfig(); - info.setConfigKey(entry.getKey()); - info.setConfigValue(entry.getValue()); - info.setOpType(AlterConfigOpType.SET.value()); - res.add(info); - } - - for (String resetProperty : resetProperties) { - PbAlterConfig info = new PbAlterConfig(); - info.setConfigKey(resetProperty); - info.setOpType(AlterConfigOpType.DELETE.value()); - res.add(info); - } - - return res; - } - private static List alterTableAddColumns() { List addColumns = new ArrayList<>(); PbAddColumn newNestedColumn = new PbAddColumn(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 691f0d9ce3..76634b970c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -35,12 +35,15 @@ import org.apache.fluss.server.coordinator.event.DropPartitionEvent; import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.TestingEventManager; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; @@ -309,7 +312,6 @@ void testSchemaChanges() { null, TableChange.ColumnPosition.last())), false, - null, null); Schema newSchema = Schema.newBuilder() @@ -329,4 +331,64 @@ void testSchemaChanges() { assertThat(eventManager.getEvents()) .containsExactlyInAnyOrderElementsOf(allEvents)); } + + @Test + void testTableRegistrationChange() { + // create a table + TablePath tablePath = TablePath.of(DEFAULT_DB, "table_registration_change"); + TableAssignment tableAssignment = + generateAssignment( + 3, + 3, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + long tableId = metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); + long currentMillis = System.currentTimeMillis(); + + List expectedEvents = new ArrayList<>(); + expectedEvents.add( + new CreateTableEvent( + TableInfo.of( + tablePath, + tableId, + schemaInfo.getSchemaId(), + TEST_TABLE, + currentMillis, + currentMillis), + tableAssignment)); + expectedEvents.add(new SchemaChangeEvent(tablePath, schemaInfo)); + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + + // alter table properties (custom property) + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setCustomProperty("custom.key", "custom.value"); + TablePropertyChanges tablePropertyChanges = builder.build(); + metadataManager.alterTableProperties( + tablePath, Collections.emptyList(), tablePropertyChanges, false, null); + + // get the updated table registration + TableRegistration updatedTableRegistration = + metadataManager.getTableRegistration(tablePath); + + // verify TableRegistrationChangeEvent is generated + expectedEvents.add(new TableRegistrationChangeEvent(tablePath, updatedTableRegistration)); + + metadataManager.dropTable(tablePath, false); + expectedEvents.add(new DropTableEvent(tableId, false, false)); + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java index 3187e5db58..90dfa0914e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java @@ -22,17 +22,24 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.log.FetchParams; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.tablet.TabletServer; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.FlussPaths; +import org.apache.fluss.utils.clock.ManualClock; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -41,14 +48,19 @@ import java.time.Duration; import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.fluss.record.TestData.DATA1; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; @@ -58,11 +70,15 @@ /** ITCase for remote log. */ public class RemoteLogITCase { + + private static final ManualClock MANUAL_CLOCK = new ManualClock(System.currentTimeMillis()); + @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder() .setNumOfTabletServers(3) .setClusterConf(initConfig()) + .setClock(MANUAL_CLOCK) .build(); private TableBucket setupTableBucket() throws Exception { @@ -75,6 +91,11 @@ private TableBucket setupTableBucket() throws Exception { private void produceRecordsAndWaitRemoteLogCopy( TabletServerGateway leaderGateway, TableBucket tb) throws Exception { + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L); + } + + private void produceRecordsAndWaitRemoteLogCopy( + TabletServerGateway leaderGateway, TableBucket tb, long baseOffset) throws Exception { for (int i = 0; i < 10; i++) { assertProduceLogResponse( leaderGateway @@ -86,7 +107,7 @@ private void produceRecordsAndWaitRemoteLogCopy( genMemoryLogRecordsByObject(DATA1))) .get(), 0, - i * 10L); + baseOffset + i * 10L); } FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote( new TableBucket(tb.getTableId(), 0)); @@ -202,6 +223,164 @@ void testFollowerFetchAlreadyMoveToRemoteLog(boolean withWriterId) throws Except FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower); } + @Test + void testRemoteLogTTLWithDynamicLakeToggle() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_remote_log_ttl_dynamic_lake"); + // ==================== Stage A: Lake Disabled (Initial) ==================== + // Create table without data lake enabled + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + // Set a short TTL for testing (1 hour) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofHours(1)) + .build(); + + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId); + + // Produce records to create remote log segments + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb); + + TabletServer tabletServer = FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId); + RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); + RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); + + assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L); + + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + LogTablet logTablet = leaderReplica.getLogTablet(); + assertThat(logTablet.isDataLakeEnabled()).isFalse(); + + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); + + // Since lake is disabled, expired segments should be deleted directly + retry( + Duration.ofMinutes(2), + () -> { + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + + // ==================== Stage B: Dynamic Enable & Retention ==================== + // Dynamically enable data lake + CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + coordinatorGateway + .alterTable( + newAlterTableRequest( + tablePath, + Collections.singletonMap( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"), + Collections.emptyList(), + Collections.emptyList(), + false)) + .get(); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isTrue()); + + // Produce new data after enabling lake (baseOffset = 100 from Stage A) + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 100L); + retry( + Duration.ofMinutes(2), + () -> assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0)); + int stageBSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + long stageBRemoteLogEndOffset = remoteLogTablet.getRemoteLogEndOffset().orElse(-1L); + assertThat(stageBRemoteLogEndOffset).isGreaterThan(0L); + + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); + + // Since lake is enabled but no data has been tiered (lakeLogEndOffset = -1), + // the expired segments should NOT be deleted + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(stageBSegmentCount); + + // ==================== Stage C: Lake Progress Update (Cleanup) ==================== + // Step 1: Partially update lake log end offset to trigger partial cleanup + // Get segments sorted by remoteLogEndOffset and use middle segment's end offset + List sortedSegments = + remoteLogTablet.allRemoteLogSegments().stream() + .sorted(Comparator.comparingLong(RemoteLogSegment::remoteLogEndOffset)) + .collect(Collectors.toList()); + assertThat(sortedSegments.size()).isGreaterThanOrEqualTo(2); + + // Use the end offset of a middle segment to ensure partial deletion + int midIndex = sortedSegments.size() / 2; + long partialLakeOffset = sortedSegments.get(midIndex).remoteLogEndOffset(); + logTablet.updateLakeLogEndOffset(partialLakeOffset); + + final int expectedRemainingSegments = sortedSegments.size() - midIndex - 1; + // The new remoteLogStartOffset should be the start offset of the first remaining segment + final long expectedNewStartOffset = sortedSegments.get(midIndex + 1).remoteLogStartOffset(); + + // Wait for partial cleanup - only segments that have been tiered should be deleted + retry( + Duration.ofMinutes(2), + () -> { + // Some segments should be deleted (those with endOffset <= partialLakeOffset) + int currentSegmentCount = remoteLogTablet.allRemoteLogSegments().size(); + assertThat(currentSegmentCount).isEqualTo(expectedRemainingSegments); + // Remote log start offset should be updated to the first remaining segment's + // start + assertThat(remoteLogTablet.getRemoteLogStartOffset()) + .isEqualTo(expectedNewStartOffset); + // Remaining segments should have remoteLogEndOffset > partialLakeOffset + assertThat(remoteLogTablet.allRemoteLogSegments()) + .allSatisfy( + segment -> + assertThat(segment.remoteLogEndOffset()) + .isGreaterThan(partialLakeOffset)); + }); + + // Step 2: Fully update lake log end offset to trigger complete cleanup + logTablet.updateLakeLogEndOffset(stageBRemoteLogEndOffset); + + // Wait for complete cleanup - all segments should be deleted + retry( + Duration.ofMinutes(2), + () -> { + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + + // ==================== Stage D: Dynamic Disable ==================== + // Dynamically disable data lake + coordinatorGateway + .alterTable( + newAlterTableRequest( + tablePath, + Collections.singletonMap( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"), + Collections.emptyList(), + Collections.emptyList(), + false)) + .get(); + retry(Duration.ofMinutes(1), () -> assertThat(logTablet.isDataLakeEnabled()).isFalse()); + + // Produce new data after disabling lake (baseOffset = 200 from Stage A + B) + produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 200L); + retry( + Duration.ofMinutes(2), + () -> assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0)); + + // Advance time past TTL (1 hour + buffer) + MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30)); + + // Since lake is disabled, expired segments should be deleted directly, + // ignoring the lake offset status + retry( + Duration.ofMinutes(2), + () -> { + assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + }); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_BUCKET_NUMBER, 1); @@ -213,6 +392,8 @@ private static Configuration initConfig() { // set a shorter max log time to allow replica shrink from isr. Don't be too low, otherwise // normal follower synchronization will also be affected conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(5)); + + conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON); return conf; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index c8e26ea7b5..fd487a0bc1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -752,6 +752,28 @@ void testRestore(@TempDir Path snapshotKvTabletDirPath) throws Exception { verifyGetKeyValues(kvTablet, expectedKeyValues); } + @Test + void testUpdateIsDataLakeEnabled() throws Exception { + Replica logReplica = + makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new TableBucket(DATA1_TABLE_ID, 1)); + makeLogReplicaAsLeader(logReplica); + + // initial state should be false + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse(); + + // update to true + logReplica.updateIsDataLakeEnabled(true); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue(); + + // update with same value should not change anything (no-op) + logReplica.updateIsDataLakeEnabled(true); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue(); + + // update to false + logReplica.updateIsDataLakeEnabled(false); + assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse(); + } + private void makeLogReplicaAsLeader(Replica replica) throws Exception { makeLeaderReplica( replica, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index b567eff1f7..5055105e95 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.testutils; +import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableDescriptor; @@ -75,6 +76,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; @@ -147,9 +149,25 @@ public static CreateTableRequest newCreateTableRequest( public static AlterTableRequest newAlterTableRequest( TablePath tablePath, - List alterConfigs, + Map setProperties, + List resetProperties, List addColumns, boolean ignoreIfExists) { + List alterConfigs = new ArrayList<>(); + for (Map.Entry entry : setProperties.entrySet()) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(entry.getKey()); + info.setConfigValue(entry.getValue()); + info.setOpType(AlterConfigOpType.SET.value()); + alterConfigs.add(info); + } + for (String resetProperty : resetProperties) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(resetProperty); + info.setOpType(AlterConfigOpType.DELETE.value()); + alterConfigs.add(info); + } + AlterTableRequest request = new AlterTableRequest(); request.addAllConfigChanges(alterConfigs) .addAllAddColumns(addColumns)