Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,6 @@ public static TableInfo of(
modifiedTime);
}

/** Replace a TableInfo with a new SchemaInfo. */
public TableInfo withNewSchema(SchemaInfo schemaInfo) {
return new TableInfo(
tablePath,
tableId,
schemaInfo.getSchemaId(),
schemaInfo.getSchema(),
bucketKeys,
partitionKeys,
numBuckets,
properties,
customProperties,
comment,
createdTime,
modifiedTime);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;

Expand Down Expand Up @@ -59,6 +61,7 @@
import javax.annotation.Nullable;

import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -72,6 +75,7 @@
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -584,20 +588,27 @@ void testAlterLakeEnabledLogTable() throws Exception {
.build();
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
admin.createTable(logTablePath, logTable, false).get();
long tableId = admin.getTableInfo(logTablePath).get().getTableId();

assertThatThrownBy(
() ->
paimonCatalog.getTable(
Identifier.create(DATABASE, logTablePath.getTableName())))
.isInstanceOf(Catalog.TableNotExistException.class);

// verify LogTablet datalake status is initially disabled
verifyLogTabletDataLakeEnabled(tableId, false);

// enable lake
TableChange.SetOption enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
List<TableChange> changes = Collections.singletonList(enableLake);

admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is enabled
verifyLogTabletDataLakeEnabled(tableId, true);

Identifier paimonTablePath = Identifier.create(DATABASE, logTablePath.getTableName());
Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);

Expand Down Expand Up @@ -635,11 +646,17 @@ void testAlterLakeEnabledLogTable() throws Exception {
// paimon table should still exist although lake is disabled
paimonCatalog.getTable(paimonTablePath);

// verify LogTablet datalake status is disabled
verifyLogTabletDataLakeEnabled(tableId, false);

// try to enable lake table again
enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is enabled again
verifyLogTabletDataLakeEnabled(tableId, true);

// write some data to the lake table
writeData(paimonCatalog.getTable(paimonTablePath));
Optional<Snapshot> snapshot = paimonCatalog.getTable(paimonTablePath).latestSnapshot();
Expand All @@ -649,10 +666,16 @@ void testAlterLakeEnabledLogTable() throws Exception {
changes = Collections.singletonList(disableLake);
admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is disabled again
verifyLogTabletDataLakeEnabled(tableId, false);

// try to enable lake table again, the snapshot should not change
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);

// verify LogTablet datalake status is enabled
verifyLogTabletDataLakeEnabled(tableId, true);
}

@Test
Expand Down Expand Up @@ -1021,6 +1044,19 @@ private void verifyPaimonTable(
assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment());
}

private void verifyLogTabletDataLakeEnabled(long tableId, boolean isDataLakeEnabled) {
for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
TableBucket tb = new TableBucket(tableId, bucket);
retry(
Duration.ofMinutes(1),
() -> {
Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
assertThat(replica.getLogTablet().isDataLakeEnabled())
.isEqualTo(isDataLakeEnabled);
});
}
}

private TableDescriptor createTableDescriptor(
int columnNum,
int bucketNum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
Expand Down Expand Up @@ -125,6 +126,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -560,6 +562,8 @@ public void process(CoordinatorEvent event) {
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
processSchemaChange(schemaChangeEvent);
} else if (event instanceof TableRegistrationChangeEvent) {
processTableRegistrationChange((TableRegistrationChangeEvent) event);
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
processNotifyLeaderAndIsrResponseReceivedEvent(
(NotifyLeaderAndIsrResponseReceivedEvent) event);
Expand Down Expand Up @@ -720,6 +724,63 @@ private void processSchemaChange(SchemaChangeEvent schemaChangeEvent) {
null);
}

private void processTableRegistrationChange(TableRegistrationChangeEvent event) {
TablePath tablePath = event.getTablePath();
Long tableId = coordinatorContext.getTableIdByPath(tablePath);

// Skip if the table is not yet registered in coordinator context.
// Should not happen in normal cases.
if (tableId == null) {
LOG.warn(
"Table {} is not registered in coordinator context, "
+ "skip processing table registration change.",
tablePath);
return;
}

TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId);

TableInfo newTableInfo =
event.getTableRegistration().toTableInfo(tablePath, oldTableInfo.getSchemaInfo());
coordinatorContext.putTableInfo(newTableInfo);
postAlterTableProperties(oldTableInfo, newTableInfo);

// Notify tablet servers about the metadata change
updateTabletServerMetadataCache(
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
tableId,
null,
null);
}

private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTableInfo) {
boolean dataLakeEnabled = newTableInfo.getTableConfig().isDataLakeEnabled();
boolean toEnableDataLake =
!oldTableInfo.getTableConfig().isDataLakeEnabled()
&& newTableInfo.getTableConfig().isDataLakeEnabled();
boolean toDisableDataLake =
oldTableInfo.getTableConfig().isDataLakeEnabled()
&& !newTableInfo.getTableConfig().isDataLakeEnabled();

if (toEnableDataLake) {
// if the table is lake table, we need to add it to lake table tiering manager
lakeTableTieringManager.addNewLakeTable(newTableInfo);
} else if (toDisableDataLake) {
lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId());
} else if (dataLakeEnabled) {
// The table is still a lake table, check if freshness has changed
Duration oldFreshness = oldTableInfo.getTableConfig().getDataLakeFreshness();
Duration newFreshness = newTableInfo.getTableConfig().getDataLakeFreshness();

// Check if freshness has changed
if (!Objects.equals(oldFreshness, newFreshness)) {
lakeTableTieringManager.updateTableLakeFreshness(
newTableInfo.getTableId(), newFreshness.toMillis());
}
}
// more post-alter actions can be added here
}

private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
long partitionId = createPartitionEvent.getPartitionId();
// skip the partition if it already exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public void addStopReplicaRequestForTabletServers(
* <li>case8: One newly tabletServer added into cluster
* <li>case9: One tabletServer is removed from cluster
* <li>case10: schemaId is changed after table is created.
* <li>case 11: TableRegistration changed after table is created.
* </ol>
*/
// todo: improve this with different phase enum.
Expand All @@ -300,7 +301,7 @@ public void addUpdateMetadataRequestForTabletServers(
.computeIfAbsent(tableId, k -> new HashMap<>())
.put(partitionId, Collections.emptyList());
} else {
// case3, case4, case10
// case3, case4, case10, case 11
updateMetadataRequestBucketMap.put(tableId, Collections.emptyList());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
+ "table properties or table schema.");
}

LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
lakeCatalogDynamicLoader.getLakeCatalogContainer();
LakeCatalog.Context lakeCatalogContext =
new DefaultLakeCatalogContext(false, currentSession().getPrincipal());

Expand All @@ -388,7 +386,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
tablePath,
alterSchemaChanges,
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeCatalogContext);
}

Expand All @@ -398,8 +395,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
alterTableConfigChanges,
tablePropertyChanges,
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeTableTieringManager,
lakeCatalogContext);
}

Expand Down
Loading