From 87996bea8ba32916a5d401583b9398804e9549e1 Mon Sep 17 00:00:00 2001 From: Liebing Date: Fri, 23 Jan 2026 14:56:23 +0800 Subject: [PATCH] wip --- .../fluss/lake/lakestorage/LakeCatalog.java | 14 +- .../lake/lakestorage/LakeSnapshotInfo.java | 69 +++++ .../lakestorage/PluginLakeStorageWrapper.java | 9 + .../lake/lakestorage/LakeStorageTest.java | 7 + .../lake/values/TestingValuesLakeCatalog.java | 7 + .../lake/iceberg/IcebergLakeCatalog.java | 7 + .../fluss/lake/paimon/PaimonLakeCatalog.java | 34 +++ .../paimon/LakeEnabledTableCreateITCase.java | 236 ++++++++++++++---- .../testutils/FlinkPaimonTieringTestBase.java | 27 +- .../paimon/tiering/PaimonTieringITCase.java | 18 -- .../server/coordinator/CoordinatorServer.java | 11 +- .../server/coordinator/MetadataManager.java | 40 ++- .../server/utils/DatalakeEnableValidator.java | 227 +++++++++++++++++ .../lakehouse/TestingPaimonStoragePlugin.java | 8 + .../testutils/FlussClusterExtension.java | 2 +- .../utils/DatalakeEnableValidatorTest.java | 141 +++++++++++ 16 files changed, 775 insertions(+), 82 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeSnapshotInfo.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/utils/DatalakeEnableValidator.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/utils/DatalakeEnableValidatorTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..a14aff2df7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -26,9 +26,10 @@ import org.apache.fluss.security.acl.FlussPrincipal; import java.util.List; +import java.util.Optional; /** - * A catalog interface to modify metadata in external datalake. + * A catalog interface to modify or get metadata in external datalake. * * @since 0.7 */ @@ -57,6 +58,17 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context c void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException; + /** + * Get the latest snapshot info for the given table. + * + * @param tablePath path of the table to get the latest snapshot info + * @param context contextual information needed for get latest snapshot info + * @return the latest snapshot information for the given table, or empty if the table does not + * exist or has no snapshots. + * @since 0.9 + */ + Optional getLatestSnapshotInfo(TablePath tablePath, Context context); + @Override default void close() throws Exception { // default do nothing diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeSnapshotInfo.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeSnapshotInfo.java new file mode 100644 index 0000000000..7fcd0db989 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeSnapshotInfo.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lakestorage; + +import javax.annotation.Nullable; + +/** + * Represents the metadata information of a snapshot in a data lake table. + * + * @see LakeCatalog#getLatestSnapshotInfo + * @since 0.9 + */ +public class LakeSnapshotInfo { + + private final long snapshotId; + + private final long commitTimestampMillis; + + /** + * The {@code fluss-offsets} property recorded in the snapshot summary. + * + *

This property has two different formats depending on the Fluss version that produced the + * snapshot: + * + *

    + *
  • v1 (JSON format, produced by Fluss 0.8): A JSON string starting with + * '{' that contains the serialized {@code TableBucketOffsets} data directly. + *
  • v2 (Path format, produced by Fluss 0.9+): A file path pointing to the offsets + * file, following the pattern: {@code + * {remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/metadata/{UUID}.offsets} + *
+ */ + @Nullable private final String flussOffsetsProperty; + + public LakeSnapshotInfo( + long snapshotId, long commitTimestampMillis, @Nullable String flussOffsetsProperty) { + this.snapshotId = snapshotId; + this.commitTimestampMillis = commitTimestampMillis; + this.flussOffsetsProperty = flussOffsetsProperty; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getCommitTimestampMillis() { + return commitTimestampMillis; + } + + @Nullable + public String getFlussOffsetsProperty() { + return flussOffsetsProperty; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java index 37cd17ad7e..425edc8dbc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java @@ -29,6 +29,7 @@ import org.apache.fluss.utils.WrappingProxy; import java.util.List; +import java.util.Optional; /** * A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all @@ -91,6 +92,14 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } + @Override + public Optional getLatestSnapshotInfo( + TablePath tablePath, Context context) { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + return inner.getLatestSnapshotInfo(tablePath, context); + } + } + @Override public void close() throws Exception { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java index 178ec37af2..46b120e132 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -153,5 +154,11 @@ public void createTable( @Override public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException {} + + @Override + public Optional getLatestSnapshotInfo( + TablePath tablePath, Context context) { + return Optional.empty(); + } } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java index 3c56926955..ed0060d6ac 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java @@ -21,11 +21,13 @@ import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import java.util.List; +import java.util.Optional; /** Implementation of {@link LakeCatalog} for values lake. */ public class TestingValuesLakeCatalog implements LakeCatalog { @@ -40,4 +42,9 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont throws TableNotExistException { throw new RuntimeException("Not impl."); } + + @Override + public Optional getLatestSnapshotInfo(TablePath tablePath, Context context) { + return Optional.empty(); + } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 9bd9d54e6c..cef19cda44 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -24,6 +24,7 @@ import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils; import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -52,6 +53,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; @@ -162,6 +164,11 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } + @Override + public Optional getLatestSnapshotInfo(TablePath tablePath, Context context) { + return Optional.empty(); + } + private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) { return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 500546e641..627fe3049b 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -22,12 +22,15 @@ import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.utils.IOUtils; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -44,6 +47,7 @@ import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; @@ -133,6 +137,36 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } + @Override + public Optional getLatestSnapshotInfo(TablePath tablePath, Context context) { + Identifier identifier = + Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); + Table paimonTable; + try { + paimonTable = paimonCatalog.getTable(identifier); + } catch (Catalog.TableNotExistException e) { + return Optional.empty(); + } + + FileStoreTable fileStoreTable = (FileStoreTable) paimonTable; + Snapshot snapshot = fileStoreTable.snapshotManager().latestSnapshot(); + if (snapshot == null) { + return Optional.empty(); + } + + String flussOffsets = + Optional.ofNullable(snapshot.properties()) + .map( + props -> + props.get( + LakeCommitter + .FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)) + .orElse(null); + + return Optional.of( + new LakeSnapshotInfo(snapshot.id(), snapshot.timeMillis(), flussOffsets)); + } + private boolean shouldAlterTable(TablePath tablePath, List tableChanges) throws TableNotExistException { try { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 6fb7003223..136ed59110 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -17,27 +17,28 @@ package org.apache.fluss.lake.paimon; -import org.apache.fluss.client.Connection; -import org.apache.fluss.client.ConnectionFactory; -import org.apache.fluss.client.admin.Admin; import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; +import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.flink.core.execution.JobClient; import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -51,14 +52,13 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import javax.annotation.Nullable; -import java.nio.file.Files; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -71,67 +71,35 @@ import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; -import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; +import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for create lake enabled table with paimon as lake storage. */ -class LakeEnabledTableCreateITCase { +class LakeEnabledTableCreateITCase extends FlinkPaimonTieringTestBase { + + private static final ManualClock MANUAL_CLOCK = new ManualClock(System.currentTimeMillis()); @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder() .setNumOfTabletServers(3) .setClusterConf(initConfig()) + .setClock(MANUAL_CLOCK) .build(); private static final String DATABASE = "fluss"; - private static Catalog paimonCatalog; private static final int BUCKET_NUM = 3; - private Connection conn; - private Admin admin; - - @BeforeEach - protected void setup() { - conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); - admin = conn.getAdmin(); - } - - @AfterEach - protected void teardown() throws Exception { - if (admin != null) { - admin.close(); - admin = null; - } - - if (conn != null) { - conn.close(); - conn = null; - } + @BeforeAll + protected static void beforeAll() { + FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig()); } - private static Configuration initConfig() { - Configuration conf = new Configuration(); - conf.setString("datalake.format", "paimon"); - conf.setString("datalake.paimon.metastore", "filesystem"); - String warehousePath; - try { - warehousePath = - Files.createTempDirectory("fluss-testing-datalake-enabled") - .resolve("warehouse") - .toString(); - } catch (Exception e) { - throw new FlussRuntimeException("Failed to create warehouse path"); - } - conf.setString("datalake.paimon.warehouse", warehousePath); - conf.setString("datalake.paimon.cache-enabled", "false"); - paimonCatalog = - CatalogFactory.createCatalog( - CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))); - - return conf; + @Override + protected FlussClusterExtension getFlussClusterExtension() { + return FLUSS_CLUSTER_EXTENSION; } @Test @@ -650,9 +618,12 @@ void testAlterLakeEnabledLogTable() throws Exception { admin.alterTable(logTablePath, changes, false).get(); // try to enable lake table again, the snapshot should not change - changes = Collections.singletonList(enableLake); - admin.alterTable(logTablePath, changes, false).get(); - assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot); + List finalChanges = Collections.singletonList(enableLake); + assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges, false).get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "existing lake table has snapshots without 'fluss-offsets' property"); } @Test @@ -971,6 +942,161 @@ void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception { .isTrue(); } + @Test + void testEnableLakeTableWithManuallyCreatedLakeTable() throws Exception { + String tb = "test_enable_with_manual_lake_table"; + TablePath tablePath = TablePath.of(DATABASE, tb); + Identifier paimonIdentifier = Identifier.create(DATABASE, tb); + + // Step 1: Create a datalake-enabled table first to get the correct Paimon schema + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .primaryKey("a") + .build()) + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + admin.createTable(tablePath, tableDescriptor, true).get(); + + // Step 2: Save the Paimon table schema for later recreation + Table originalPaimonTable = paimonCatalog.getTable(paimonIdentifier); + originalPaimonTable.options().remove(CoreOptions.PATH.key()); + org.apache.paimon.schema.Schema originalSchema = + new org.apache.paimon.schema.Schema( + originalPaimonTable.rowType().getFields(), + originalPaimonTable.partitionKeys(), + originalPaimonTable.primaryKeys(), + originalPaimonTable.options(), + originalPaimonTable.comment().orElse(null)); + + // Step 3: Disable datalake + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")), + false) + .get(); + + // Step 4: Drop the Paimon table and recreate with the same schema + paimonCatalog.dropTable(paimonIdentifier, false); + paimonCatalog.createTable(paimonIdentifier, originalSchema, false); + + // Step 5: Write some data to paimon table without fluss-offsets property + // (simulating user directly writing to lake) + Table paimonTable = paimonCatalog.getTable(paimonIdentifier); + writeData(paimonTable); + + // Step 6: Try to re-enable datalake, should fail because snapshot has no fluss-offsets + // property (data was written directly) + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED + .key(), + "true")), + false) + .get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("data was written directly into the lake table"); + } + + @Test + void testEnableLakeTableWithExpiredSnapshot() throws Exception { + TablePath tablePath = TablePath.of(DATABASE, "test_enable_with_expired_snapshot"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofMinutes(10)) + .distributedBy(1, "a") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + long tableId = tableInfo.getTableId(); + + TableBucket t1Bucket = new TableBucket(tableId, 0); + // write records + List rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + writeRows(tablePath, rows, true); + + // then start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + try { + // check the status of replica after synced + assertReplicaStatus(t1Bucket, 3); + // check data in paimon + checkDataInPaimonPrimaryKeyTable(tablePath, rows); + checkFlussOffsetsInSnapshot( + tablePath, Collections.singletonMap(new TableBucket(tableId, 0), 3L)); + + // disable and enable data lake should be ok + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + Boolean.FALSE.toString())), + false) + .get(); + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + Boolean.TRUE.toString())), + false) + .get(); + + // disable data lake and advance the clock to expire the snapshot + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + Boolean.FALSE.toString())), + false) + .get(); + MANUAL_CLOCK.advanceTime(Duration.ofHours(1)); + + // enable data lake should fail because the latest lake snapshot is older than + // table.log.ttl + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions + .TABLE_DATALAKE_ENABLED + .key(), + Boolean.TRUE.toString())), + false) + .get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "because the latest lake snapshot is older than table.log.ttl"); + } finally { + jobClient.cancel().get(); + } + } + private void verifyPaimonTable( Table paimonTable, TableDescriptor flussTable, diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 90ab136fde..8128ba5292 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -72,6 +72,7 @@ import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; @@ -106,6 +107,10 @@ protected static Configuration initConfig() { throw new FlussRuntimeException("Failed to create warehouse path"); } conf.setString("datalake.paimon.warehouse", warehousePath); + conf.setString("datalake.paimon.cache-enabled", "false"); + paimonCatalog = + CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))); return conf; } @@ -113,7 +118,7 @@ public static void beforeAll(Configuration conf) { clientConf = conf; conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); - paimonCatalog = getPaimonCatalog(); + // paimonCatalog = getPaimonCatalog(); } @BeforeEach @@ -459,13 +464,29 @@ protected void checkDataInPaimonPrimaryKeyTable( } } + protected void checkDataInPaimonAppendOnlyTable( + TablePath tablePath, List expectedRows, long startingOffset) + throws Exception { + Iterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath); + Iterator flussRowIterator = expectedRows.iterator(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + InternalRow flussRow = flussRowIterator.next(); + assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); + assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); + // system columns are always the last three: __bucket, __offset, __timestamp + int offsetIndex = row.getFieldCount() - 2; + assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++); + } + assertThat(flussRowIterator.hasNext()).isFalse(); + } + protected CloseableIterator getPaimonRowCloseableIterator( TablePath tablePath) throws Exception { Identifier tableIdentifier = Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); - paimonCatalog = getPaimonCatalog(); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(tableIdentifier); RecordReader reader = diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index bf92cd5f57..1e3270e746 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -493,24 +493,6 @@ private Tuple2 createPartitionedTable( partitionedTableDescriptor); } - private void checkDataInPaimonAppendOnlyTable( - TablePath tablePath, List expectedRows, long startingOffset) - throws Exception { - Iterator paimonRowIterator = - getPaimonRowCloseableIterator(tablePath); - Iterator flussRowIterator = expectedRows.iterator(); - while (paimonRowIterator.hasNext()) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - InternalRow flussRow = flussRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); - assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); - // system columns are always the last three: __bucket, __offset, __timestamp - int offsetIndex = row.getFieldCount() - 2; - assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++); - } - assertThat(flussRowIterator.hasNext()).isFalse(); - } - private void checkDataInPaimonAppendOnlyPartitionedTable( TablePath tablePath, Map partitionSpec, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 76a3fbd68c..fd8805a1d2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -44,6 +44,8 @@ import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.ExecutorUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; import org.apache.fluss.utils.concurrent.FutureUtils; @@ -86,6 +88,8 @@ public class CoordinatorServer extends ServerBase { private final AtomicBoolean isShutDown = new AtomicBoolean(false); + private final Clock clock; + @GuardedBy("lock") private String serverId; @@ -142,9 +146,14 @@ public class CoordinatorServer extends ServerBase { private LakeCatalogDynamicLoader lakeCatalogDynamicLoader; public CoordinatorServer(Configuration conf) { + this(conf, SystemClock.getInstance()); + } + + public CoordinatorServer(Configuration conf, Clock clock) { super(conf); validateConfigs(conf); this.terminationFuture = new CompletableFuture<>(); + this.clock = clock; } public static void main(String[] args) { @@ -191,7 +200,7 @@ protected void startServices() throws Exception { this.lakeTableTieringManager = new LakeTableTieringManager(); MetadataManager metadataManager = - new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); + new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader, clock); this.ioExecutor = Executors.newFixedThreadPool( conf.get(ConfigOptions.SERVER_IO_POOL_SIZE), diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index f728e91da5..70cd5f1aa2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -52,6 +52,8 @@ import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.function.RunnableWithException; import org.apache.fluss.utils.function.ThrowingRunnable; @@ -72,6 +74,7 @@ import java.util.Set; import java.util.concurrent.Callable; +import static org.apache.fluss.server.utils.DatalakeEnableValidator.validateBeforeEnable; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor; @@ -84,6 +87,7 @@ public class MetadataManager { private final int maxPartitionNum; private final int maxBucketNum; private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; + private final Clock clock; public static final Set SENSITIVE_TABLE_OPTIONS = new HashSet<>(); @@ -93,20 +97,31 @@ public class MetadataManager { SENSITIVE_TABLE_OPTIONS.add("key"); } + public MetadataManager( + ZooKeeperClient zookeeperClient, + Configuration conf, + LakeCatalogDynamicLoader lakeCatalogDynamicLoader) { + this(zookeeperClient, conf, lakeCatalogDynamicLoader, SystemClock.getInstance()); + } + /** * Creates a new metadata manager. * * @param zookeeperClient the zookeeper client * @param conf the cluster configuration + * @param lakeCatalogDynamicLoader the lake catalog dynamic loader + * @param clock the clock */ public MetadataManager( ZooKeeperClient zookeeperClient, Configuration conf, - LakeCatalogDynamicLoader lakeCatalogDynamicLoader) { + LakeCatalogDynamicLoader lakeCatalogDynamicLoader, + Clock clock) { this.zookeeperClient = zookeeperClient; this.maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM); this.maxBucketNum = conf.get(ConfigOptions.MAX_BUCKET_NUM); this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader; + this.clock = clock; } public void createDatabase( @@ -432,7 +447,8 @@ public void alterTableProperties( newDescriptor, tableChanges, lakeCatalog, - lakeCatalogContext); + lakeCatalogContext, + tableInfo); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -472,7 +488,8 @@ private void preAlterTableProperties( TableDescriptor newDescriptor, List tableChanges, LakeCatalog lakeCatalog, - LakeCatalog.Context lakeCatalogContext) { + LakeCatalog.Context lakeCatalogContext, + TableInfo tableInfo) { if (isDataLakeEnabled(newDescriptor)) { if (lakeCatalog == null) { throw new InvalidAlterTableException( @@ -483,6 +500,23 @@ private void preAlterTableProperties( // to enable lake table if (!isDataLakeEnabled(tableDescriptor)) { + // validate before enabling datalake on an existing table + try { + validateBeforeEnable( + tablePath, + tableInfo, + lakeCatalog, + lakeCatalogContext, + zookeeperClient, + clock); + } catch (Exception e) { + if (e instanceof InvalidAlterTableException) { + throw (InvalidAlterTableException) e; + } + throw new FlussRuntimeException( + "Failed to validate enabling datalake for table " + tablePath, e); + } + // before create table in fluss, we may create in lake try { lakeCatalog.createTable(tablePath, newDescriptor, lakeCatalogContext); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/DatalakeEnableValidator.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/DatalakeEnableValidator.java new file mode 100644 index 0000000000..167515d57b --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/DatalakeEnableValidator.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.lake.committer.LakeCommitter; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.lake.LakeTable; +import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.json.TableBucketOffsets; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +/** + * Helper for validating enabling datalake on an existing table. + * + *

The validator focuses on two aspects: + * + *

    + *
  • case1: snapshot consistency between Fluss and the external lake table + *
  • case2: TTL risk when the latest lake snapshot is much older than {@code table.log.ttl} + *
+ */ +public final class DatalakeEnableValidator { + + /** Perform validation before enabling datalake for an existing table. */ + public static void validateBeforeEnable( + TablePath tablePath, + TableInfo tableInfo, + LakeCatalog lakeCatalog, + LakeCatalog.Context lakeContext, + ZooKeeperClient zkClient, + Clock clock) + throws Exception { + long tableId = tableInfo.getTableId(); + Optional lakeTableOptional = zkClient.getLakeTable(tableId); + Optional latestSnapshotInfoOptional = + lakeCatalog.getLatestSnapshotInfo(tablePath, lakeContext); + + checkSnapshotConsistency(tablePath, tableId, lakeTableOptional, latestSnapshotInfoOptional); + checkTtlRisk(tableInfo, latestSnapshotInfoOptional, clock); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static void checkSnapshotConsistency( + TablePath tablePath, + long tableId, + Optional lakeTableOptional, + Optional latestSnapshotInfoOptional) + throws IOException { + if (!latestSnapshotInfoOptional.isPresent()) { + // No snapshot information from lake side. + // - If Fluss side also has no LakeTable, this is treated as first-time enable. + // - If Fluss side has LakeTable metadata but lake side has no snapshot info, + // consider it as an inconsistent state (for example, lake table or snapshots + // were removed manually). + if (!lakeTableOptional.isPresent()) { + return; + } + + throw new InvalidAlterTableException( + String.format( + "Cannot enable datalake for table %s because Fluss has lake " + + "snapshot metadata but the lake table has no snapshot information. " + + "This usually means the lake table or its snapshots were manually " + + "removed. Please recreate or clean the lake table and try again.", + tablePath)); + } + + LakeSnapshotInfo lakeSnapshotInfo = latestSnapshotInfoOptional.get(); + String flussOffsets = lakeSnapshotInfo.getFlussOffsetsProperty(); + + if (flussOffsets == null) { + // There is a snapshot but no fluss-offsets property; most likely the snapshot was not + // produced by Fluss tiering. + throw new InvalidAlterTableException( + String.format( + "Cannot enable datalake for table %s because existing lake table " + + "has snapshots without '%s' property. This usually means " + + "data was written directly into the lake table. Please recreate " + + "or clean the lake table and try again.", + tablePath, LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)); + } + + long tableIdFromOffsets = extractTableIdFromOffsetsProperty(flussOffsets); + if (tableIdFromOffsets != tableId) { + throw new InvalidAlterTableException( + String.format( + "Cannot enable datalake for table %s because the latest lake snapshot " + + "belongs to a different Fluss table (tableId in offsets path: %d, " + + "current tableId: %d). Please ensure the lake table is created " + + "and written by the same Fluss table.", + tablePath, tableIdFromOffsets, tableId)); + } + + // If Fluss side also has LakeTable, we can further verify snapshot id alignment. + if (lakeTableOptional.isPresent()) { + LakeTable lakeTable = lakeTableOptional.get(); + LakeTableSnapshot snapshot = lakeTable.getOrReadLatestTableSnapshot(); + if (snapshot.getSnapshotId() != lakeSnapshotInfo.getSnapshotId()) { + throw new InvalidAlterTableException( + String.format( + "Cannot enable datalake for table %s because Fluss and lake " + + "have different latest snapshot ids (fluss: %d, lake: %d).", + tablePath, + snapshot.getSnapshotId(), + lakeSnapshotInfo.getSnapshotId())); + } + } + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static void checkTtlRisk( + TableInfo tableInfo, + Optional latestSnapshotInfoOptional, + Clock clock) { + TableConfig tableConfig = tableInfo.getTableConfig(); + long ttlMs = tableConfig.getLogTTLMs(); + if (ttlMs <= 0) { + // TTL disabled (e.g. -1), skip TTL checks. + return; + } + + if (!latestSnapshotInfoOptional.isPresent()) { + // No snapshot info from lake side, cannot judge TTL risk, skip. + return; + } + + LakeSnapshotInfo snapshotInfo = latestSnapshotInfoOptional.get(); + long snapshotTimeMillis = snapshotInfo.getCommitTimestampMillis(); + if (snapshotTimeMillis <= 0) { + // Unknown commit timestamp, skip TTL checks. + return; + } + + long now = clock.milliseconds(); + long diff = now - snapshotTimeMillis; + + if (diff > ttlMs) { + throw new InvalidAlterTableException( + String.format( + "Cannot enable datalake for table %s because the latest lake snapshot " + + "is older than table.log.ttl (snapshot age: %d ms, ttl: %d ms). " + + "Enabling datalake may cause tiering to fail when restoring from " + + "expired offsets.", + tableInfo.getTablePath(), diff, ttlMs)); + } + } + + public static long extractTableIdFromOffsetsProperty(String offsets) { + String trimmed = offsets.trim(); + if (trimmed.isEmpty()) { + throw new InvalidAlterTableException( + String.format( + "Invalid '%s' property: empty value.", + LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)); + } + + // v1: JSON format, deserialize TableBucketOffsets and read tableId. + if (trimmed.startsWith("{")) { + TableBucketOffsets tableBucketOffsets = + TableBucketOffsets.fromJsonBytes(trimmed.getBytes(StandardCharsets.UTF_8)); + return tableBucketOffsets.getTableId(); + } + + // v2: path format, e.g. + // {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/metadata/{UUID}.offsets + int metadataIdx = trimmed.indexOf("/metadata/"); + if (metadataIdx <= 0) { + throw new InvalidAlterTableException( + String.format( + "Invalid '%s' path: %s. Expected to contain '/metadata/'.", + LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, offsets)); + } + + String tableDir = trimmed.substring(0, metadataIdx); + int lastSlashIdx = tableDir.lastIndexOf('/'); + if (lastSlashIdx < 0 || lastSlashIdx == tableDir.length() - 1) { + throw new InvalidAlterTableException( + String.format( + "Invalid '%s' path: %s. Cannot extract table name part.", + LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, offsets)); + } + + String tableSegment = tableDir.substring(lastSlashIdx + 1); + int dashIdx = tableSegment.lastIndexOf('-'); + if (dashIdx < 0 || dashIdx == tableSegment.length() - 1) { + throw new InvalidAlterTableException( + String.format( + "Invalid '%s' path: %s. Cannot extract tableId part.", + LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, offsets)); + } + + try { + return Long.parseLong(tableSegment.substring(dashIdx + 1)); + } catch (NumberFormatException e) { + throw new InvalidAlterTableException( + String.format( + "Invalid '%s' path: %s. tableId is not a number.", + LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, offsets), + e); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index aa030d0256..02b7fdab1d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -21,6 +21,7 @@ import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; import org.apache.fluss.lake.lakestorage.LakeStorage; import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; import org.apache.fluss.lake.source.LakeSource; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** A plugin of paimon just for testing purpose. */ public class TestingPaimonStoragePlugin implements LakeStoragePlugin { @@ -92,6 +94,12 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont // do nothing } + @Override + public Optional getLatestSnapshotInfo( + TablePath tablePath, Context context) { + return Optional.empty(); + } + public TableDescriptor getTable(TablePath tablePath) { return tableByPath.get(tablePath); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 43b12943d0..a7d579a9cc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -261,7 +261,7 @@ public void startCoordinatorServer() throws Exception { conf.setString(ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperServer.getConnectString()); conf.setString(ConfigOptions.BIND_LISTENERS, coordinatorServerListeners); setRemoteDataDir(conf); - coordinatorServer = new CoordinatorServer(conf); + coordinatorServer = new CoordinatorServer(conf, clock); coordinatorServer.start(); coordinatorServerInfo = // TODO, Currently, we use 0 as coordinator server id. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/DatalakeEnableValidatorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/DatalakeEnableValidatorTest.java new file mode 100644 index 0000000000..91a970ab28 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/DatalakeEnableValidatorTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.lake.lakestorage.LakeSnapshotInfo; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.data.lake.LakeTable; +import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.clock.SystemClock; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.fluss.server.utils.DatalakeEnableValidator.checkSnapshotConsistency; +import static org.apache.fluss.server.utils.DatalakeEnableValidator.checkTtlRisk; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link DatalakeEnableValidator}. */ +class DatalakeEnableValidatorTest { + + @Test + void testSnapshotConsistency() throws Exception { + TablePath tablePath = TablePath.of("db", "t"); + long tableId = 1L; + + // both Fluss side and lake side have no snapshot - should pass + checkSnapshotConsistency(tablePath, tableId, Optional.empty(), Optional.empty()); + + // no snapshot but has lake table - should fail + LakeTableSnapshot lakeTableSnapshot = new LakeTableSnapshot(1L, Collections.emptyMap()); + LakeTable lakeTable = new LakeTable(lakeTableSnapshot); + assertThatThrownBy( + () -> + checkSnapshotConsistency( + tablePath, + tableId, + Optional.of(lakeTable), + Optional.empty())) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("lake table has no snapshot information"); + + // table id mismatched - should fail + LakeSnapshotInfo snapshotWithWrongTableId = + new LakeSnapshotInfo( + 1L, + System.currentTimeMillis(), + "/remote/data/lake/db/t-2/metadata/uuid.offsets"); + assertThatThrownBy( + () -> + checkSnapshotConsistency( + tablePath, + tableId, + Optional.empty(), + Optional.of(snapshotWithWrongTableId))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("belongs to a different Fluss table"); + + // snapshot id mismatched - should fail + LakeSnapshotInfo snapshotWithWrongSnapshotId = + new LakeSnapshotInfo( + 2L, + System.currentTimeMillis(), + "/remote/data/lake/db/t-1/metadata/uuid.offsets"); + assertThatThrownBy( + () -> + checkSnapshotConsistency( + tablePath, + tableId, + Optional.of(lakeTable), + Optional.of(snapshotWithWrongSnapshotId))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("different latest snapshot ids"); + } + + @Test + void testTtlRisk() { + // within TTL - should pass + TableInfo tableInfo = createTableInfoWithTtl(Duration.ofHours(1)); + LakeSnapshotInfo snapshotInfo = new LakeSnapshotInfo(1L, System.currentTimeMillis(), null); + checkTtlRisk(tableInfo, Optional.of(snapshotInfo), SystemClock.getInstance()); + + // older than TTL - should fail + Duration ttl = Duration.ofMillis(10); + TableInfo tableInfoWithShortTtl = createTableInfoWithTtl(ttl); + long oldTimestamp = System.currentTimeMillis() - ttl.toMillis() * 10; + LakeSnapshotInfo oldSnapshot = new LakeSnapshotInfo(1L, oldTimestamp, null); + assertThatThrownBy( + () -> + checkTtlRisk( + tableInfoWithShortTtl, + Optional.of(oldSnapshot), + SystemClock.getInstance())) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("older than table.log.ttl"); + } + + private static TableInfo createTableInfoWithTtl(Duration ttl) { + Schema schema = Schema.newBuilder().column("a", DataTypes.INT()).build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .property(ConfigOptions.TABLE_LOG_TTL, ttl) + .build(); + + TablePath tablePath = TablePath.of("db", "t"); + return TableInfo.of( + tablePath, + 1L, + 1, + tableDescriptor, + System.currentTimeMillis(), + System.currentTimeMillis()); + } +}