diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index aaf06592e5..fca8d408cb 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -124,6 +124,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .comment("test table") .distributedBy(3, "id") .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .property(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION, 2) .customProperty("connector", "fluss") .build(); @@ -774,6 +775,14 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception { try (Connection conn = ConnectionFactory.createConnection(clientConf); Admin admin = conn.getAdmin()) { TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get(); + TableDescriptor expectedDescriptor = + DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON); + Map expectedProperties = + new HashMap<>(expectedDescriptor.getProperties()); + expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2"); + expectedDescriptor = expectedDescriptor.withProperties(expectedProperties); + + assertThat(tableInfo.toTableDescriptor()).isEqualTo(expectedDescriptor); assertThat(tableInfo.toTableDescriptor()) .isEqualTo( DEFAULT_TABLE_DESCRIPTOR diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index c21c6a8bb4..1a82fd66fb 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -67,8 +67,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows; @@ -103,6 +105,9 @@ void testGetDescriptor() throws Exception { DATA1_TABLE_DESCRIPTOR_PK .withReplicationFactor(3) .withDataLakeFormat(DataLakeFormat.PAIMON); + Map expectedProperties = new HashMap<>(expected.getProperties()); + expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2"); + expected = expected.withProperties(expectedProperties); assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected); } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7e04c3a1cc..76d42d22c1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1414,6 +1414,14 @@ public class ConfigOptions { .withDescription( "If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default."); + public static final ConfigOption TABLE_DATALAKE_STORAGE_VERSION = + key("table.datalake.storage-version") + .intType() + .noDefaultValue() + .withDescription( + "The storage version of the datalake table. This option is automatically set by Fluss server " + + "and cannot be set by clients. Version 2 indicates a clean schema without system columns in datalake."); + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") .enumType(MergeEngineType.class) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 984a1def4a..65cb1c0e6e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -106,6 +106,11 @@ public boolean isDataLakeAutoExpireSnapshot() { return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT); } + /** Gets the data lake storage version of the table. Returns empty if not set (legacy table). */ + public Optional getDataLakeStorageVersion() { + return config.getOptional(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION); + } + /** Gets the optional merge engine type of the table. */ public Optional getMergeEngineType() { return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE); diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..85408eead3 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -35,6 +35,8 @@ @PublicEvolving public interface LakeCatalog extends AutoCloseable { + Integer CURRENT_LAKE_STORAGE_VERSION = 2; + /** * Create a new table in lake. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 45ce7945ad..e554accf1d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -323,15 +323,23 @@ public boolean isBounded() { case TIMESTAMP: offsetsInitializer = OffsetsInitializer.timestamp(startupOptions.startupTimestampMs); - if (hasPrimaryKey()) { - // Currently, for primary key tables, we do not consider lake data - // when reading from a given timestamp. This is because we will need - // to read the change log of primary key table. - // TODO: consider support it using paimon change log data? - enableLakeSource = false; - } else { - if (enableLakeSource) { - enableLakeSource = pushTimeStampFilterToLakeSource(lakeSource); + if (lakeSource != null) { + if (!tableOptions.containsKey( + ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key())) { + enableLakeSource = handleLegacyV1LakeTable(); + } else { + int lakeStorageVersion = + Integer.parseInt( + tableOptions.get( + ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION + .key())); + if (lakeStorageVersion < 2) { + throw new IllegalArgumentException( + "Unsupported lake storage version: " + lakeStorageVersion); + } else { + // todo: support map to partition in issue + enableLakeSource = false; + } } } break; @@ -384,6 +392,18 @@ public boolean isBounded() { } } + private boolean handleLegacyV1LakeTable() { + if (hasPrimaryKey()) { + // Currently, for primary key tables, we do not consider lake data + // when reading from a given timestamp. This is because we will need + // to read the change log of primary key table. + // TODO: consider support it using paimon change log data? + return false; + } else { + return pushTimeStampFilterToLakeSource(lakeSource); + } + } + private boolean pushTimeStampFilterToLakeSource(LakeSource lakeSource) { // will push timestamp to lake // we will have three additional system columns, __bucket, __offset, __timestamp diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index f42ba30ae7..14ce19207a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -634,6 +634,7 @@ void testTableWithExpression() throws Exception { Map expectedTableProperties = new HashMap<>(); expectedTableProperties.put("table.datalake.format", "paimon"); expectedTableProperties.put("table.replication.factor", "1"); + expectedTableProperties.put("table.datalake.storage-version", "2"); assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties); Map expectedCustomProperties = new HashMap<>(); @@ -896,6 +897,7 @@ protected static void assertOptionsEqual( Map actualOptions, Map expectedOptions) { actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key()); actualOptions.remove(ConfigOptions.TABLE_REPLICATION_FACTOR.key()); + actualOptions.remove(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key()); assertThat(actualOptions).isEqualTo(expectedOptions); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java index a462b77b88..57ad4bfe82 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java @@ -26,6 +26,7 @@ import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION; import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR; import static org.assertj.core.api.Assertions.assertThat; @@ -140,6 +141,7 @@ private static void assertOptionsEqual( actualOptions.remove(TABLE_REPLICATION_FACTOR.key()); // Remove datalake format (auto-added when datalake is enabled in Fluss cluster) actualOptions.remove(TABLE_DATALAKE_FORMAT.key()); + actualOptions.remove(TABLE_DATALAKE_STORAGE_VERSION.key()); assertThat(actualOptions).isEqualTo(expectedOptions); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 500546e641..63ece3457e 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -112,7 +112,9 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { try { - List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); + Table paimonTable = getTable(tablePath); + List paimonSchemaChanges = + toPaimonSchemaChanges(paimonTable, tableChanges); // Compare current Paimon table schema with expected target schema before altering if (shouldAlterTable(tablePath, tableChanges)) { @@ -157,6 +159,14 @@ private boolean shouldAlterTable(TablePath tablePath, List tableCha } } + private Table getTable(TablePath tablePath) throws TableNotExistException { + try { + return paimonCatalog.getTable(toPaimon(tablePath)); + } catch (Catalog.TableNotExistException e) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); + } + } + private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) { String columnName = addColumn.getName(); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java index 3af7467cfa..77cc91a2e6 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java @@ -94,10 +94,12 @@ private ReadBuilder applyProject( int timestampFieldPos = paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME); int[] paimonProject = - IntStream.concat( - IntStream.of(projectIds), - IntStream.of(offsetFieldPos, timestampFieldPos)) - .toArray(); + hasSystemColumn(paimonFullRowType) + ? IntStream.concat( + IntStream.of(projectIds), + IntStream.of(offsetFieldPos, timestampFieldPos)) + .toArray() + : projectIds; return readBuilder.withProjection(paimonProject); } @@ -110,17 +112,15 @@ public static class PaimonRowAsFlussRecordIterator implements CloseableIterator< private final ProjectedRow projectedRow; private final PaimonRowAsFlussRow paimonRowAsFlussRow; - private final int logOffsetColIndex; - private final int timestampColIndex; - public PaimonRowAsFlussRecordIterator( org.apache.paimon.utils.CloseableIterator paimonRowIterator, RowType paimonRowType) { this.paimonRowIterator = paimonRowIterator; - this.logOffsetColIndex = paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME); - this.timestampColIndex = paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME); - int[] project = IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray(); + int[] project = + hasSystemColumn(paimonRowType) + ? IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray() + : IntStream.range(0, paimonRowType.getFieldCount()).toArray(); projectedRow = ProjectedRow.from(project); paimonRowAsFlussRow = new PaimonRowAsFlussRow(); } @@ -143,14 +143,21 @@ public boolean hasNext() { public LogRecord next() { InternalRow paimonRow = paimonRowIterator.next(); ChangeType changeType = toChangeType(paimonRow.getRowKind()); - long offset = paimonRow.getLong(logOffsetColIndex); - long timestamp = paimonRow.getTimestamp(timestampColIndex, 6).getMillisecond(); return new GenericRecord( - offset, - timestamp, + -1L, + -1L, changeType, projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow))); } } + + // for legacy table, we will have system column + private static boolean hasSystemColumn(RowType paimonRowType) { + return paimonRowType + .getFields() + .get(paimonRowType.getFieldCount() - 1) + .name() + .equals(TIMESTAMP_COLUMN_NAME); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java index c092fbdab4..8b72537b8b 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java @@ -31,6 +31,7 @@ /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */ public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow { + private final boolean paimonIncludingSystemColumns; private final int bucket; private LogRecord logRecord; private int originRowFieldCount; @@ -39,15 +40,25 @@ public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow { private final int offsetFieldIndex; private final int timestampFieldIndex; - public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) { + public FlussRecordAsPaimonRow( + int bucket, RowType tableTowType, boolean paimonIncludingSystemColumns) { super(tableTowType); this.bucket = bucket; - this.businessFieldCount = tableRowType.getFieldCount() - SYSTEM_COLUMNS.size(); + this.businessFieldCount = + tableRowType.getFieldCount() + - (paimonIncludingSystemColumns ? SYSTEM_COLUMNS.size() : 0); + this.paimonIncludingSystemColumns = paimonIncludingSystemColumns; + + // only valid when paimon including system columns this.bucketFieldIndex = businessFieldCount; this.offsetFieldIndex = businessFieldCount + 1; this.timestampFieldIndex = businessFieldCount + 2; } + public FlussRecordAsPaimonRow(int bucket, RowType tablerowType) { + this(bucket, tablerowType, false); + } + public void setFlussRecord(LogRecord logRecord) { this.logRecord = logRecord; this.internalRow = logRecord.getRow(); @@ -86,13 +97,21 @@ public boolean isNullAt(int pos) { // Padding NULL for missing business fields when Paimon schema is wider than Fluss return true; } - // is the last three system fields: bucket, offset, timestamp which are never null - return false; + + if (paimonIncludingSystemColumns && pos < businessFieldCount + SYSTEM_COLUMNS.size()) { + // is the last three system fields: bucket, offset, timestamp which are never null + return false; + } + + // shouldn't happen in normal cases. + throw new IllegalStateException( + String.format( + "Field %s is NULL because Paimon schema is wider than Fluss record.", pos)); } @Override public int getInt(int pos) { - if (pos == bucketFieldIndex) { + if (pos == bucketFieldIndex && paimonIncludingSystemColumns) { // bucket system column return bucket; } @@ -107,12 +126,14 @@ public int getInt(int pos) { @Override public long getLong(int pos) { - if (pos == offsetFieldIndex) { - // offset system column - return logRecord.logOffset(); - } else if (pos == timestampFieldIndex) { - // timestamp system column - return logRecord.timestamp(); + if (paimonIncludingSystemColumns) { + if (pos == offsetFieldIndex) { + // offset system column + return logRecord.logOffset(); + } else if (pos == timestampFieldIndex) { + // timestamp system column + return logRecord.timestamp(); + } } if (pos >= originRowFieldCount) { throw new IllegalStateException( @@ -126,8 +147,8 @@ public long getLong(int pos) { @Override public Timestamp getTimestamp(int pos, int precision) { - // it's timestamp system column - if (pos == timestampFieldIndex) { + if (paimonIncludingSystemColumns && pos == timestampFieldIndex) { + // it's timestamp system column return Timestamp.fromEpochMillis(logRecord.timestamp()); } if (pos >= originRowFieldCount) { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8472b825b0..79f009dcdc 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -53,18 +53,29 @@ public PaimonLakeWriter( List partitionKeys = fileStoreTable.partitionKeys(); + // currently, when the storage version is not present, the corresponding + // paimon table must include system columns + boolean paimonIncludeSystemColumns = + !writerInitContext + .tableInfo() + .getTableConfig() + .getDataLakeStorageVersion() + .isPresent(); + this.recordWriter = fileStoreTable.primaryKeys().isEmpty() ? new AppendOnlyWriter( fileStoreTable, writerInitContext.tableBucket(), writerInitContext.partition(), - partitionKeys) + partitionKeys, + paimonIncludeSystemColumns) : new MergeTreeWriter( fileStoreTable, writerInitContext.tableBucket(), writerInitContext.partition(), - partitionKeys); + partitionKeys, + paimonIncludeSystemColumns); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java index 413b141898..b7d5049284 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java @@ -46,7 +46,8 @@ public RecordWriter( RowType tableRowType, TableBucket tableBucket, @Nullable String partition, - List partitionKeys) { + List partitionKeys, + boolean paimonIncludingSystemColumns) { this.tableWrite = tableWrite; this.tableRowType = tableRowType; this.bucket = tableBucket.getBucket(); @@ -56,7 +57,8 @@ public RecordWriter( this.partition = BinaryRow.EMPTY_ROW; } this.flussRecordAsPaimonRow = - new FlussRecordAsPaimonRow(tableBucket.getBucket(), tableRowType); + new FlussRecordAsPaimonRow( + tableBucket.getBucket(), tableRowType, paimonIncludingSystemColumns); } public abstract void write(LogRecord record) throws Exception; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java index 7daec20b7a..c3d045de1f 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java @@ -41,7 +41,8 @@ public AppendOnlyWriter( FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition, - List partitionKeys) { + List partitionKeys, + boolean paimonIncludingSystemColumns) { //noinspection unchecked super( (TableWriteImpl) @@ -50,7 +51,8 @@ public AppendOnlyWriter( fileStoreTable.rowType(), tableBucket, partition, - partitionKeys); // Pass to parent + partitionKeys, + paimonIncludingSystemColumns); // Pass to parent this.fileStoreTable = fileStoreTable; } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java index 95a5275187..14bef09a80 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java @@ -49,13 +49,15 @@ public MergeTreeWriter( FileStoreTable fileStoreTable, TableBucket tableBucket, @Nullable String partition, - List partitionKeys) { + List partitionKeys, + boolean paimonIncludingSystemColumns) { super( createTableWrite(fileStoreTable), fileStoreTable.rowType(), tableBucket, partition, - partitionKeys); + partitionKeys, + paimonIncludingSystemColumns); this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor(); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index ded40ac59b..86b8b2ce79 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon.utils; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow; @@ -33,6 +34,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -41,9 +43,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION; import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; /** Utils for conversion between Paimon and Fluss. */ public class PaimonConversions { @@ -113,7 +118,8 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) { .getFieldOrNull(flussRowAsPaimonRow); } - public static List toPaimonSchemaChanges(List tableChanges) { + public static List toPaimonSchemaChanges( + Table paimonTable, List tableChanges) { List schemaChanges = new ArrayList<>(tableChanges.size()); for (TableChange tableChange : tableChanges) { @@ -145,14 +151,24 @@ public static List toPaimonSchemaChanges(List tableCh org.apache.paimon.types.DataType paimonDataType = flussDataType.accept(FlussDataTypeToPaimonDataType.INSTANCE); - String firstSystemColumnName = SYSTEM_COLUMNS.keySet().iterator().next(); - schemaChanges.add( - SchemaChange.addColumn( - addColumn.getName(), - paimonDataType, - addColumn.getComment(), - SchemaChange.Move.before( - addColumn.getName(), firstSystemColumnName))); + if (paimonTable.rowType().getFieldIndex(TIMESTAMP_COLUMN_NAME) >= 0) { + // must be legacy v1 table with system columns + String firstSystemColumnName = SYSTEM_COLUMNS.keySet().iterator().next(); + schemaChanges.add( + SchemaChange.addColumn( + addColumn.getName(), + paimonDataType, + addColumn.getComment(), + SchemaChange.Move.before( + addColumn.getName(), firstSystemColumnName))); + } else { + schemaChanges.add( + SchemaChange.addColumn( + addColumn.getName(), + paimonDataType, + addColumn.getComment(), + SchemaChange.Move.last(addColumn.getName()))); + } } else { throw new UnsupportedOperationException( "Unsupported table change: " + tableChange.getClass()); @@ -206,9 +222,15 @@ public static Schema toPaimonSchema(TableDescriptor tableDescriptor) { column.getComment().orElse(null)); } - // add system metadata columns to schema - for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { - schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue()); + // add system metadata columns to schema only for legacy tables (storage-version not set) + Optional storageVersion = + Configuration.fromMap(tableDescriptor.getProperties()) + .getOptional(TABLE_DATALAKE_STORAGE_VERSION); + if (!storageVersion.isPresent()) { + // Legacy table: add system columns + for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { + schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue()); + } } // set pk diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 6fb7003223..6128d6b94c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -29,6 +29,7 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; @@ -49,6 +50,7 @@ import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.AfterEach; @@ -67,6 +69,7 @@ import java.util.Optional; import java.util.stream.Stream; +import static org.apache.fluss.lake.paimon.testutils.PaimonTestUtils.adjustToLegacyV1Table; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; @@ -164,18 +167,10 @@ void testCreateLakeEnabledTable() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, new String[] { - "log_c1", - "log_c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "log_c1", "log_c2", }), "log_c1,log_c2", BUCKET_NUM); @@ -202,18 +197,10 @@ void testCreateLakeEnabledTable() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, new String[] { - "log_c1", - "log_c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "log_c1", "log_c2", }), null, BUCKET_NUM); @@ -241,19 +228,9 @@ void testCreateLakeEnabledTable() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT().notNull(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, - new String[] { - "pk_c1", - "pk_c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME - }), + new String[] {"pk_c1", "pk_c2"}), "pk_c1", BUCKET_NUM); @@ -284,19 +261,10 @@ void testCreateLakeEnabledTable() throws Exception { new DataType[] { org.apache.paimon.types.DataTypes.INT().notNull(), org.apache.paimon.types.DataTypes.STRING(), - org.apache.paimon.types.DataTypes.STRING().notNull(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING().notNull() }, new String[] { - "c1", - "c2", - "c3", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "c1", "c2", "c3", }), "c1", BUCKET_NUM); @@ -352,31 +320,11 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception { org.apache.paimon.types.DataTypes.TIME(), org.apache.paimon.types.DataTypes.TIMESTAMP(), org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() }, new String[] { - "log_c1", - "log_c2", - "log_c3", - "log_c4", - "log_c5", - "log_c6", - "log_c7", - "log_c8", - "log_c9", - "log_c10", - "log_c11", - "log_c12", - "log_c13", - "log_c14", - "log_c15", - "log_c16", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "log_c1", "log_c2", "log_c3", "log_c4", "log_c5", "log_c6", "log_c7", + "log_c8", "log_c9", "log_c10", "log_c11", "log_c12", "log_c13", + "log_c14", "log_c15", "log_c16", }), null, BUCKET_NUM); @@ -456,8 +404,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception { .isInstanceOf(LakeTableAlreadyExistException.class) .hasMessage( "The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. " - + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, " - + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. " + + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1, fluss.table.datalake.storage-version=2}, comment=null}, " + + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1, fluss.table.datalake.storage-version=2}, comment=null}. " + "Please first drop the table in Paimon catalog or use a new table name."); // create log table with different fields will throw exception @@ -475,8 +423,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception { .isInstanceOf(LakeTableAlreadyExistException.class) .hasMessage( "The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. " - + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, " - + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. " + + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1, fluss.table.datalake.storage-version=2}, comment=null}, " + + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1, fluss.table.datalake.storage-version=2}, comment=null}. " + "Please first drop the table in Paimon catalog or use a new table name."); // add an insignificant option to Paimon table will be ok @@ -611,19 +559,9 @@ void testAlterLakeEnabledLogTable() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, - new String[] { - "log_c1", - "log_c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME - }), + new String[] {"log_c1", "log_c2"}), "log_c1,log_c2", BUCKET_NUM); @@ -708,18 +646,8 @@ void testAlterLakeEnabledTableProperties() throws Exception { new DataType[] { org.apache.paimon.types.DataTypes.INT(), org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() }, - new String[] { - "c1", - "c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME - }), + new String[] {"c1", "c2"}), "c1,c2", BUCKET_NUM); @@ -738,18 +666,10 @@ void testAlterLakeEnabledTableProperties() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, new String[] { - "c1", - "c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "c1", "c2", }), "c1,c2", BUCKET_NUM); @@ -829,17 +749,9 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception { new DataType[] { org.apache.paimon.types.DataTypes.INT(), org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() }, new String[] { - "c1", - "c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME + "c1", "c2", }), "c1,c2", BUCKET_NUM); @@ -881,19 +793,9 @@ void testAlterLakeEnabledTableSchema() throws Exception { RowType.of( new DataType[] { org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.STRING(), - // for __bucket, __offset, __timestamp - org.apache.paimon.types.DataTypes.INT(), - org.apache.paimon.types.DataTypes.BIGINT(), - org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + org.apache.paimon.types.DataTypes.STRING() }, - new String[] { - "c1", - "c2", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME - }), + new String[] {"c1", "c2"}), "c1,c2", BUCKET_NUM); @@ -912,15 +814,8 @@ void testAlterLakeEnabledTableSchema() throws Exception { paimonCatalog.getTable(Identifier.create(DATABASE, tablePath.getTableName())); // Verify the new column c3 with comment was added to Paimon table RowType alteredRowType = alteredPaimonTable.rowType(); - assertThat(alteredRowType.getFieldCount()).isEqualTo(6); - assertThat(alteredRowType.getFieldNames()) - .containsExactly( - "c1", - "c2", - "c3", - BUCKET_COLUMN_NAME, - OFFSET_COLUMN_NAME, - TIMESTAMP_COLUMN_NAME); + assertThat(alteredRowType.getFieldCount()).isEqualTo(3); + assertThat(alteredRowType.getFieldNames()).containsExactly("c1", "c2", "c3"); // Verify c3 column has the correct type and comment assertThat(alteredRowType.getField("c3").type()) .isEqualTo(org.apache.paimon.types.DataTypes.INT()); @@ -933,20 +828,21 @@ void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception { TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(Schema.newBuilder().column("c1", DataTypes.INT()).build()) + .distributedBy(3) .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) .build(); admin.createTable(tablePath, tableDescriptor, false).get(); - - Identifier paimonIdentifier = Identifier.create(DATABASE, tablePath.getTableName()); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + long tableId = tableInfo.getTableId(); // alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior - paimonCatalog.alterTable( - paimonIdentifier, - SchemaChange.updateColumnType( - TIMESTAMP_COLUMN_NAME, - org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), - false); + adjustToLegacyV1Table( + tablePath, + tableId, + tableInfo.toTableDescriptor(), + paimonCatalog, + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient()); // disable data lake admin.alterTable( @@ -969,6 +865,25 @@ void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception { .get(); assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled()) .isTrue(); + + // verify we can still alter add new column + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "col_add1", + DataTypes.STRING(), + null, + TableChange.ColumnPosition.last())), + false) + .get(); + + Table table = paimonCatalog.getTable(Identifier.create(DATABASE, tablePath.getTableName())); + RowType rowType = table.rowType(); + // get the last column except for the system column + DataField dataField = rowType.getFields().get(rowType.getFieldCount() - 4); + assertThat(dataField.name()).isEqualTo("col_add1"); + assertThat(dataField.type()).isEqualTo(org.apache.paimon.types.DataTypes.STRING()); } private void verifyPaimonTable( diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java index 9cab9c7d46..730cf8bcb8 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java @@ -23,6 +23,7 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; import org.apache.fluss.server.replica.Replica; @@ -47,6 +48,7 @@ import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder; +import static org.apache.fluss.lake.paimon.testutils.PaimonTestUtils.adjustToLegacyV1Table; import static org.apache.fluss.testutils.DataTestUtils.row; /** The ITCase for Flink union read from a timestamp. */ @@ -89,15 +91,24 @@ protected FlussClusterExtension getFlussClusterExtension() { } @Test - void testUnionReadFromTimestamp() throws Exception { + void testUnionReadFromTimestampWithLegacyVersion() throws Exception { // first of all, start tiering + String tableName = "logTable_read_timestamp"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + long tableId = createLogTable(tablePath, 1); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + + // adjust to legacy version to verify the legacy version + adjustToLegacyV1Table( + tablePath, + tableId, + tableInfo.toTableDescriptor(), + paimonCatalog, + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient()); + JobClient jobClient = buildTieringJob(execEnv); + TableBucket t1Bucket = new TableBucket(tableId, 0); try { - String tableName = "logTable_read_timestamp"; - TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); - long tableId = createLogTable(tablePath, 1); - TableBucket t1Bucket = new TableBucket(tableId, 0); - List rows = new ArrayList<>(); for (int i = 0; i < 10; i++) { rows.addAll(writeRows(tablePath, 3)); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 41f18b8c6e..6416a81b11 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -68,6 +68,7 @@ import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; +import static org.apache.fluss.lake.paimon.testutils.PaimonTestUtils.adjustToLegacyV1Table; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -110,31 +111,19 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { // will read paimon snapshot, won't merge log since it's empty List resultEmptyLog = toSortedRows(batchTEnv.executeSql("select * from " + tableName)); - String expetedResultFromPaimon = buildExpectedResult(isPartitioned, partitions, 0, 1); - assertThat(resultEmptyLog.toString().replace("+U", "+I")) - .isEqualTo(expetedResultFromPaimon); + List expetedResultFromPaimon = buildExpectedResult(isPartitioned, partitions, 0, 1); + assertThat(resultEmptyLog).containsExactlyInAnyOrderElementsOf(expetedResultFromPaimon); // read paimon directly using $lake TableResult tableResult = batchTEnv.executeSql(String.format("select * from %s$lake", tableName)); + // paimon's source will emit +U[0, v0, xx] instead of +I[0, v0, xx], so + // replace +U with +I to make it equal List paimonSnapshotRows = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map( - row -> { - int userColumnCount = row.getArity() - 3; - Object[] fields = new Object[userColumnCount]; - for (int i = 0; i < userColumnCount; i++) { - fields[i] = row.getField(i); - } - return Row.of(fields); - }) - .map(Row::toString) - .sorted() + .map(row -> row.toString().replace("+U", "+I")) .collect(Collectors.toList()); - // paimon's source will emit +U[0, v0, xx] instead of +I[0, v0, xx], so - // replace +U with +I to make it equal - assertThat(paimonSnapshotRows.toString().replace("+U", "+I")) - .isEqualTo(expetedResultFromPaimon); + assertThat(paimonSnapshotRows).containsExactlyInAnyOrderElementsOf(expetedResultFromPaimon); // test point query with fluss String queryFilterStr = "c4 = 30"; @@ -273,15 +262,6 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { tableName, queryFilterStr)) .collect()) .stream() - .map( - row -> { - int columnCount = row.getArity() - 3; - Object[] fields = new Object[columnCount]; - for (int i = 0; i < columnCount; i++) { - fields[i] = row.getField(i); - } - return Row.of(fields); - }) .map(Row::toString) .sorted() .collect(Collectors.toList()); @@ -405,7 +385,11 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { // now, query the result, it must be the union result of lake snapshot and log List result = toSortedRows(batchTEnv.executeSql("select * from " + tableName)); - String expectedResult = buildExpectedResult(isPartitioned, partitions, 0, 2); + String expectedResult = + buildExpectedResult(isPartitioned, partitions, 0, 2).stream() + .sorted() + .collect(Collectors.toList()) + .toString(); assertThat(result.toString().replace("+U", "+I")).isEqualTo(expectedResult); // query with project push down @@ -430,6 +414,46 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString()); } + @Test + void testUnionReadLegacyTable() throws Exception { + String tableName = "legacy_table"; + TablePath t1 = TablePath.of(DEFAULT_DB, tableName); + long tableId = createSimplePkTable(t1, 3, false, true); + adjustToLegacyV1Table( + t1, + tableId, + admin.getTableInfo(t1).get().toTableDescriptor(), + paimonCatalog, + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient()); + + JobClient jobClient = buildTieringJob(execEnv); + List rows = new ArrayList<>(); + Map bucketLogEndOffset = new HashMap<>(); + for (int i = 0; i < 3; i++) { + rows.add( + GenericRow.of( + i, BinaryString.fromString("v1_1"), BinaryString.fromString("v1_2"))); + bucketLogEndOffset.put(new TableBucket(tableId, i), 1L); + } + writeRows(t1, rows, false); + assertReplicaStatus(bucketLogEndOffset); + + // cancel tiering job + jobClient.cancel().get(); + + rows.clear(); + for (int i = 0; i < 3; i++) { + rows.add( + GenericRow.of( + i, BinaryString.fromString("v2_1"), BinaryString.fromString("v2_2"))); + } + writeRows(t1, rows, false); + + List result = toSortedRows(batchTEnv.executeSql("select * from " + tableName)); + assertThat(result.toString()) + .isEqualTo("[+I[0, v2_1, v2_2], +I[1, v2_1, v2_2], +I[2, v2_1, v2_2]]"); + } + @Test void testUnionReadWhenSomeBucketNotTiered() throws Exception { // first of all, start tiering @@ -1132,7 +1156,7 @@ private Map getBucketLogEndOffset( return bucketLogEndOffsets; } - private String buildExpectedResult( + private List buildExpectedResult( boolean isPartitioned, List partitions, int record1, int record2) { List records = new ArrayList<>(); records.add( @@ -1158,15 +1182,13 @@ private String buildExpectedResult( + "[5, 6, 7, 8], [2.1, 2.2, 2.3], +I[300, nested_value_3, 9.99], {key5=5, key6=6}, %s]"); if (isPartitioned) { - return String.format( - "[%s, %s, %s, %s]", + return Arrays.asList( String.format(records.get(record1), partitions.get(0)), String.format(records.get(record1), partitions.get(1)), String.format(records.get(record2), partitions.get(0)), String.format(records.get(record2), partitions.get(1))); } else { - return String.format( - "[%s, %s]", + return Arrays.asList( String.format(records.get(record1), "null"), String.format(records.get(record2), "null")); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/PaimonTestUtils.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/PaimonTestUtils.java new file mode 100644 index 0000000000..09330b8c67 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/PaimonTestUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.testutils; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.TableRegistration; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.types.DataTypes; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + +/** The utils for paimon testing. */ +public class PaimonTestUtils { + + /** Adjust the table to legacy v1 table with system columns. */ + public static void adjustToLegacyV1Table( + TablePath tablePath, + long tableId, + TableDescriptor tableDescriptor, + Catalog paimonCatalog, + ZooKeeperClient zkClient) + throws Exception { + // firstly, clear the lake storage version option + Map props = new HashMap<>(tableDescriptor.getProperties()); + props.remove(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key()); + props.keySet().removeIf(k -> k.startsWith("table.datalake.paimon.")); + TableDescriptor newTableDescriptor = tableDescriptor.withProperties(props); + TableRegistration tableRegistration = + TableRegistration.newTable(tableId, newTableDescriptor); + zkClient.updateTable(tablePath, tableRegistration); + + // then, alter paimon table to add three system columns + paimonCatalog.alterTable( + toPaimon(tablePath), + Arrays.asList( + SchemaChange.addColumn(BUCKET_COLUMN_NAME, DataTypes.INT()), + SchemaChange.addColumn(OFFSET_COLUMN_NAME, DataTypes.BIGINT()), + SchemaChange.addColumn( + TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS())), + false); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java index 15f6179711..7dff4f51bf 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java @@ -29,7 +29,6 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; -import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.junit.jupiter.api.Test; @@ -68,11 +67,7 @@ void testLogTableRecordAllTypes() { new org.apache.paimon.types.LocalZonedTimestampType(6), new org.apache.paimon.types.TimestampType(6), new org.apache.paimon.types.BinaryType(), - new org.apache.paimon.types.VarCharType(), - // append three system columns: __bucket, __offset,__timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.VarCharType()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -120,28 +115,14 @@ void testLogTableRecordAllTypes() { assertThat(flussRecordAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4}); assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue(); - // verify FlussRecordAsPaimonRow system columns (no partition fields, so indices stay same) - assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(tableBucket); - assertThat(flussRecordAsPaimonRow.getLong(15)).isEqualTo(logOffset); - assertThat(flussRecordAsPaimonRow.getLong(16)).isEqualTo(timeStamp); - assertThat(flussRecordAsPaimonRow.getTimestamp(16, 4)) - .isEqualTo(Timestamp.fromEpochMillis(timeStamp)); assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT); - - assertThat(flussRecordAsPaimonRow.getFieldCount()) - .isEqualTo(14 + 3); // business + system = 14 + 0 + 3 = 17 + assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(14); } @Test void testPrimaryKeyTableRecord() { int tableBucket = 0; - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.BooleanType(), - // append three system columns: __bucket, __offset,__timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + RowType tableRowType = RowType.of(new org.apache.paimon.types.BooleanType()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); long logOffset = 0; @@ -153,8 +134,7 @@ void testPrimaryKeyTableRecord() { flussRecordAsPaimonRow.setFlussRecord(logRecord); assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue(); - // normal columns + system columns - assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(4); + assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(1); // verify rowkind assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT); @@ -178,11 +158,7 @@ void testArrayTypeWithIntElements() { RowType.of( new org.apache.paimon.types.IntType(), new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType()), - // system columns: __bucket, __offset, __timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -204,11 +180,6 @@ void testArrayTypeWithIntElements() { assertThat(array.getInt(2)).isEqualTo(3); assertThat(array.getInt(3)).isEqualTo(4); assertThat(array.getInt(4)).isEqualTo(5); - - // Verify system columns are still accessible - assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket); - assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset); - assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp); } @Test @@ -217,11 +188,7 @@ void testArrayTypeWithStringElements() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.VarCharType()), - // system columns: __bucket, __offset, __timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.VarCharType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -252,11 +219,7 @@ void testNestedArrayType() { RowType.of( new org.apache.paimon.types.ArrayType( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType())), - // system columns: __bucket, __offset, __timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType()))); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -306,11 +269,7 @@ void testArrayWithAllPrimitiveTypes() { new org.apache.paimon.types.ArrayType( new org.apache.paimon.types.FloatType()), new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.DoubleType()), - // system columns: __bucket, __offset, __timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.DoubleType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -367,10 +326,6 @@ void testArrayWithAllPrimitiveTypes() { InternalArray doubleArray = flussRecordAsPaimonRow.getArray(6); assertThat(doubleArray.getDouble(0)).isEqualTo(1.11); assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); - - // Verify system columns - assertThat(flussRecordAsPaimonRow.getInt(7)).isEqualTo(tableBucket); - assertThat(flussRecordAsPaimonRow.getLong(8)).isEqualTo(logOffset); } @Test @@ -379,11 +334,7 @@ void testArrayWithDecimalElements() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.DecimalType(10, 2)), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.DecimalType(10, 2))); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -413,11 +364,7 @@ void testArrayWithTimestampElements() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.TimestampType(3)), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.TimestampType(3))); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -447,11 +394,7 @@ void testArrayWithBinaryElements() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.VarBinaryType()), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.VarBinaryType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -476,11 +419,7 @@ void testNullArray() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType()) - .nullable(), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + .nullable()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -501,11 +440,7 @@ void testArrayWithNullableElements() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType().nullable()), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType().nullable())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -530,11 +465,7 @@ void testEmptyArray() { RowType tableRowType = RowType.of( new org.apache.paimon.types.ArrayType( - new org.apache.paimon.types.IntType()), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -557,11 +488,7 @@ void testPaimonSchemaWiderThanFlussRecord() { RowType tableRowType = RowType.of( new org.apache.paimon.types.BooleanType(), - new org.apache.paimon.types.VarCharType(), - // append three system columns: __bucket, __offset,__timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.VarCharType()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -573,26 +500,16 @@ void testPaimonSchemaWiderThanFlussRecord() { LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); flussRecordAsPaimonRow.setFlussRecord(logRecord); - assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5); + assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(2); assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue(); assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue(); - assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket); - assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset); - assertThat(flussRecordAsPaimonRow.getTimestamp(4, 3)) - .isEqualTo(Timestamp.fromEpochMillis(timeStamp)); } @Test void testFlussRecordWiderThanPaimonSchema() { int tableBucket = 0; - RowType tableRowType = - RowType.of( - new org.apache.paimon.types.BooleanType(), - // append three system columns: __bucket, __offset,__timestamp - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + RowType tableRowType = RowType.of(new org.apache.paimon.types.BooleanType()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -683,11 +600,7 @@ void testNestedRowType() { rowWithArrayType, new org.apache.paimon.types.ArrayType(arrayElementRowType), simpleNestedRowType.nullable(), - nullableFieldsRowType, - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + nullableFieldsRowType); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -820,11 +733,6 @@ void testNestedRowType() { assertThat(paimonNullableFieldsRow).isNotNull(); assertThat(paimonNullableFieldsRow.getInt(0)).isEqualTo(42); assertThat(paimonNullableFieldsRow.isNullAt(1)).isTrue(); - - // Verify system columns - assertThat(flussRecordAsPaimonRow.getInt(8)).isEqualTo(tableBucket); - assertThat(flussRecordAsPaimonRow.getLong(9)).isEqualTo(logOffset); - assertThat(flussRecordAsPaimonRow.getLong(10)).isEqualTo(timeStamp); } @Test @@ -990,11 +898,7 @@ void testNullMap() { new org.apache.paimon.types.MapType( new org.apache.paimon.types.IntType(), new org.apache.paimon.types.IntType()) - .nullable(), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + .nullable()); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -1016,11 +920,7 @@ void testMapWithNullableValues() { RowType.of( new org.apache.paimon.types.MapType( new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.IntType().nullable()), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType().nullable())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); @@ -1051,11 +951,7 @@ void testEmptyMap() { RowType.of( new org.apache.paimon.types.MapType( new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.IntType()), - // system columns - new org.apache.paimon.types.IntType(), - new org.apache.paimon.types.BigIntType(), - new org.apache.paimon.types.LocalZonedTimestampType(3)); + new org.apache.paimon.types.IntType())); FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket, tableRowType); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 19cbeb5585..a12ae2978c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -135,7 +135,7 @@ void testTiering() throws Exception { .isGreaterThan(-1); // check data in paimon - checkDataInPaimonAppendOnlyTable(t2, flussRows, 0); + checkDataInPaimonAppendOnlyTable(t2, flussRows); // then write data to the pk tables // write records @@ -179,8 +179,7 @@ void testTiering() throws Exception { checkDataInPaimonAppendOnlyPartitionedTable( partitionedTablePath, Collections.singletonMap(partitionCol, partitionName), - writtenRowsByPartition.get(partitionName), - 0); + writtenRowsByPartition.get(partitionName)); } checkFlussOffsetsInSnapshot(partitionedTablePath, expectedOffsets); } finally { @@ -374,7 +373,7 @@ void testTieringForAlterTable() throws Exception { assertReplicaStatus(t2Bucket, 30); // check data in paimon - checkDataInPaimonAppendOnlyTable(t2, flussRows, 0); + checkDataInPaimonAppendOnlyTable(t2, flussRows); // then write data to the pk tables // write records @@ -424,8 +423,7 @@ void testTieringForAlterTable() throws Exception { checkDataInPaimonAppendOnlyPartitionedTable( partitionedTablePath, Collections.singletonMap(partitionCol, partitionName), - writtenRowsByPartition.get(partitionName), - 0); + writtenRowsByPartition.get(partitionName)); } checkFlussOffsetsInSnapshot(partitionedTablePath, expectedOffset); @@ -494,8 +492,7 @@ private Tuple2 createPartitionedTable( } private void checkDataInPaimonAppendOnlyTable( - TablePath tablePath, List expectedRows, long startingOffset) - throws Exception { + TablePath tablePath, List expectedRows) throws Exception { Iterator paimonRowIterator = getPaimonRowCloseableIterator(tablePath); Iterator flussRowIterator = expectedRows.iterator(); @@ -504,18 +501,12 @@ private void checkDataInPaimonAppendOnlyTable( InternalRow flussRow = flussRowIterator.next(); assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); - // system columns are always the last three: __bucket, __offset, __timestamp - int offsetIndex = row.getFieldCount() - 2; - assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++); } assertThat(flussRowIterator.hasNext()).isFalse(); } private void checkDataInPaimonAppendOnlyPartitionedTable( - TablePath tablePath, - Map partitionSpec, - List expectedRows, - long startingOffset) + TablePath tablePath, Map partitionSpec, List expectedRows) throws Exception { Iterator paimonRowIterator = getPaimonRowCloseableIterator(tablePath, partitionSpec); @@ -526,8 +517,6 @@ private void checkDataInPaimonAppendOnlyPartitionedTable( assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); assertThat(row.getString(2).toString()).isEqualTo(flussRow.getString(2).toString()); - // the idx 3 is __bucket, so use 4 - assertThat(row.getLong(4)).isEqualTo(startingOffset++); } assertThat(flussRowIterator.hasNext()).isFalse(); } @@ -594,9 +583,8 @@ void testTieringWithAddColumn() throws Exception { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(tableIdentifier); List fieldNames = paimonTable.rowType().getFieldNames(); - // Should have exact fields in order: a, b, c3, __bucket, __offset, __timestamp - assertThat(fieldNames) - .containsExactly("a", "b", "c3", "__bucket", "__offset", "__timestamp"); + // Should have exact fields in order: a, b, c3 + assertThat(fieldNames).containsExactly("a", "b", "c3"); // 9. Verify both schema evolution and data correctness // For initial rows (before ADD COLUMN), c3 should be NULL @@ -611,7 +599,7 @@ void testTieringWithAddColumn() throws Exception { expectedRows.add(row(5, "v5", 50)); expectedRows.add(row(6, "v6", 60)); - checkDataInPaimonAppendOnlyTable(tablePath, expectedRows, 0); + checkDataInPaimonAppendOnlyTable(tablePath, expectedRows); } finally { jobClient.cancel().get(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 94d56dd338..cfc22115cc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -159,6 +159,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; +import static org.apache.fluss.lake.lakestorage.LakeCatalog.CURRENT_LAKE_STORAGE_VERSION; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; @@ -467,9 +468,16 @@ private TableDescriptor applySystemDefaults( } // override the datalake format if the table hasn't set it and the cluster configured - if (dataLakeFormat != null - && !properties.containsKey(ConfigOptions.TABLE_DATALAKE_FORMAT.key())) { - newDescriptor = newDescriptor.withDataLakeFormat(dataLakeFormat); + if (dataLakeFormat != null) { + Map newProperties = new HashMap<>(newDescriptor.getProperties()); + if (!properties.containsKey(ConfigOptions.TABLE_DATALAKE_FORMAT.key())) { + newProperties.put( + ConfigOptions.TABLE_DATALAKE_FORMAT.key(), dataLakeFormat.toString()); + } + newProperties.put( + ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), + String.valueOf(CURRENT_LAKE_STORAGE_VERSION)); + newDescriptor = newDescriptor.withProperties(newProperties); } // lake table can only be enabled when the cluster configures datalake format diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 8aedc194b5..96d29de05c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -49,9 +49,11 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION; import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; +import static org.apache.fluss.lake.lakestorage.LakeCatalog.CURRENT_LAKE_STORAGE_VERSION; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -111,6 +113,7 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int checkTieredLog(tableConf); checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType()); checkSystemColumns(schema.getRowType()); + checkLakeStorageVersion(tableConf); } public static void validateAlterTableProperties( @@ -454,4 +457,16 @@ private static void validateOptionValue(ReadableConfig options, ConfigOption option.key(), t.getMessage())); } } + + private static void checkLakeStorageVersion(Configuration tableConf) { + Optional optDataLakeStorageVersion = + tableConf.getOptional(TABLE_DATALAKE_STORAGE_VERSION); + if (optDataLakeStorageVersion.isPresent() + && !optDataLakeStorageVersion.get().equals(CURRENT_LAKE_STORAGE_VERSION)) { + throw new IllegalArgumentException( + String.format( + "The option '%s' cannot be set to %d. It is automatically set by Fluss server.", + TABLE_DATALAKE_STORAGE_VERSION.key(), optDataLakeStorageVersion.get())); + } + } }