From 2b0026313564222f5edab995c8dd4d9d635eecb0 Mon Sep 17 00:00:00 2001 From: Mehul Batra Date: Mon, 12 Jan 2026 01:36:44 +0530 Subject: [PATCH 1/6] changelog support for pk table --- .gitignore | 1 + .../fluss/metadata/TableDescriptor.java | 5 + .../Flink120ChangelogVirtualTableITCase.java | 21 + .../fluss/flink/catalog/FlinkCatalog.java | 118 +++++ .../flink/catalog/FlinkTableFactory.java | 74 +++ .../source/ChangelogFlinkTableSource.java | 238 +++++++++ .../ChangelogDeserializationSchema.java | 81 ++++ .../flink/utils/ChangelogRowConverter.java | 125 +++++ .../fluss/flink/utils/PlainRowConverter.java | 55 +++ .../utils/RecordToFlinkRowConverter.java | 34 ++ .../source/ChangelogVirtualTableITCase.java | 450 ++++++++++++++++++ .../utils/ChangelogRowConverterTest.java | 209 ++++++++ .../utils/TableDescriptorValidation.java | 8 +- 13 files changed, 1418 insertions(+), 1 deletion(-) create mode 100644 fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java diff --git a/.gitignore b/.gitignore index f0c5b601e9..51b585ba2b 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ website/versioned_docs website/versioned_sidebars website/versions.json website/pnpm-lock.yaml +/.claude/ diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java index e3e4fc931f..7f2d22dc36 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java @@ -60,6 +60,11 @@ public final class TableDescriptor implements Serializable { public static final String TIMESTAMP_COLUMN_NAME = "__timestamp"; public static final String BUCKET_COLUMN_NAME = "__bucket"; + // Reserved column names for $changelog virtual table metadata + public static final String CHANGELOG_CHANGE_TYPE_COLUMN = "_change_type"; + public static final String CHANGELOG_LOG_OFFSET_COLUMN = "_log_offset"; + public static final String CHANGELOG_COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; + private final Schema schema; private final @Nullable String comment; private final List partitionKeys; diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..b0e4a11518 --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.20. */ +public class Flink120ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 9d0151133d..65c244efc1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -112,6 +112,8 @@ public class FlinkCatalog extends AbstractCatalog { public static final String LAKE_TABLE_SPLITTER = "$lake"; + public static final String CHANGELOG_TABLE_SUFFIX = "$changelog"; + public static final String BINLOG_TABLE_SUFFIX = "$binlog"; protected final ClassLoader classLoader; @@ -303,6 +305,18 @@ public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { // may be should be as a datalake table String tableName = objectPath.getObjectName(); + + // Check if this is a virtual table ($changelog or $binlog) + if (tableName.endsWith(CHANGELOG_TABLE_SUFFIX)) { + return getVirtualChangelogTable(objectPath); + } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)) { + // TODO: Implement binlog virtual table in future + throw new UnsupportedOperationException( + String.format( + "$binlog virtual tables are not yet supported for table %s", + objectPath)); + } + TablePath tablePath = toTablePath(objectPath); try { TableInfo tableInfo; @@ -862,4 +876,108 @@ private static boolean isPrefixList(List fullList, List prefixLi } return true; } + + /** + * Creates a virtual $changelog table by modifying the base table's to include metadata columns. + */ + private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath) + throws TableNotExistException, CatalogException { + // Extract the base table name (remove $changelog suffix) + String virtualTableName = objectPath.getObjectName(); + String baseTableName = + virtualTableName.substring( + 0, virtualTableName.length() - CHANGELOG_TABLE_SUFFIX.length()); + + // Get the base table + ObjectPath baseObjectPath = new ObjectPath(objectPath.getDatabaseName(), baseTableName); + TablePath baseTablePath = toTablePath(baseObjectPath); + + try { + // Retrieve base table info + TableInfo tableInfo = admin.getTableInfo(baseTablePath).get(); + + // Validate that this is a primary key table + if (tableInfo.getPhysicalPrimaryKeys().isEmpty()) { + throw new UnsupportedOperationException( + String.format( + "Virtual $changelog tables are only supported for primary key tables. " + + "Table %s does not have a primary key.", + baseTablePath)); + } + + // Convert to Flink table + CatalogBaseTable catalogBaseTable = FlinkConversions.toFlinkTable(tableInfo); + + if (!(catalogBaseTable instanceof CatalogTable)) { + throw new UnsupportedOperationException( + "Virtual $changelog tables are only supported for regular tables"); + } + + CatalogTable baseTable = (CatalogTable) catalogBaseTable; + + // Build the changelog schema by adding metadata columns + Schema originalSchema = baseTable.getUnresolvedSchema(); + Schema changelogSchema = buildChangelogSchema(originalSchema); + + // Copy options from base table + Map newOptions = new HashMap<>(baseTable.getOptions()); + newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers); + newOptions.putAll(securityConfigs); + + // Create a new CatalogTable with the modified schema + return CatalogTable.of( + changelogSchema, + baseTable.getComment(), + baseTable.getPartitionKeys(), + newOptions); + + } catch (Exception e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + if (isTableNotExist(t)) { + throw new TableNotExistException(getName(), baseObjectPath); + } else { + throw new CatalogException( + String.format( + "Failed to get virtual changelog table %s in %s", + objectPath, getName()), + t); + } + } + } + + private Schema buildChangelogSchema(Schema originalSchema) { + Schema.Builder builder = Schema.newBuilder(); + + // Add metadata columns first + builder.column("_change_type", org.apache.flink.table.api.DataTypes.STRING().notNull()); + builder.column("_log_offset", org.apache.flink.table.api.DataTypes.BIGINT().notNull()); + builder.column( + "_commit_timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(3).notNull()); + + // Add all original columns + for (Schema.UnresolvedColumn column : originalSchema.getColumns()) { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + Schema.UnresolvedPhysicalColumn physicalColumn = + (Schema.UnresolvedPhysicalColumn) column; + builder.column(physicalColumn.getName(), physicalColumn.getDataType()); + } else if (column instanceof Schema.UnresolvedComputedColumn) { + Schema.UnresolvedComputedColumn computedColumn = + (Schema.UnresolvedComputedColumn) column; + builder.columnByExpression( + computedColumn.getName(), computedColumn.getExpression()); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + Schema.UnresolvedMetadataColumn metadataColumn = + (Schema.UnresolvedMetadataColumn) column; + builder.columnByMetadata( + metadataColumn.getName(), + metadataColumn.getDataType(), + metadataColumn.getMetadataKey(), + metadataColumn.isVirtual()); + } + } + + // Note: We don't copy primary keys or watermarks for virtual tables + + return builder.build(); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 48d74e712e..189df7d3b9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -23,6 +23,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.lake.LakeTableFactory; import org.apache.fluss.flink.sink.FlinkTableSink; +import org.apache.fluss.flink.source.ChangelogFlinkTableSource; import org.apache.fluss.flink.sink.shuffle.DistributionMode; import org.apache.fluss.flink.source.FlinkTableSource; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -90,6 +91,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { return lakeTableFactory.createDynamicTableSource(context, lakeTableName); } + // Check if this is a $changelog suffix in table name + if (tableName.endsWith(FlinkCatalog.CHANGELOG_TABLE_SUFFIX)) { + return createChangelogTableSource(context, tableIdentifier, tableName); + } + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); Optional datalakeFormat = getDatalakeFormat(tableOptions); @@ -267,4 +273,72 @@ private LakeTableFactory mayInitLakeTableFactory() { } return lakeTableFactory; } + + /** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */ + private DynamicTableSource createChangelogTableSource( + Context context, ObjectIdentifier tableIdentifier, String tableName) { + // Extract the base table name by removing the $changelog suffix + String baseTableName = + tableName.substring( + 0, tableName.length() - FlinkCatalog.CHANGELOG_TABLE_SUFFIX.length()); + + boolean isStreamingMode = + context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + == RuntimeExecutionMode.STREAMING; + + // tableOutputType includes metadata columns: [_change_type, _log_offset, _commit_timestamp, + // data_cols...] + RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + + // Extract data columns type (skip the 3 metadata columns) for index calculations + int numMetadataColumns = 3; + List dataFields = + tableOutputType + .getFields() + .subList(numMetadataColumns, tableOutputType.getFieldCount()); + RowType dataColumnsType = new RowType(new ArrayList<>(dataFields)); + + Map catalogTableOptions = context.getCatalogTable().getOptions(); + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig tableOptions = helper.getOptions(); + + ZoneId timeZone = + FlinkConnectorOptionsUtils.getLocalTimeZone( + context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE)); + final FlinkConnectorOptionsUtils.StartupOptions startupOptions = + FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, timeZone); + + ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); + + // bucket.key is set from the base table's primary key + int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, dataColumnsType); + + // For changelog virtual tables, if bucket key is present, base table has a PK + int[] primaryKeyIndexes = bucketKeyIndexes; + + // Partition key indexes based on data columns + int[] partitionKeyIndexes = + resolvedCatalogTable.getPartitionKeys().stream() + .mapToInt(dataColumnsType::getFieldIndex) + .toArray(); + + long partitionDiscoveryIntervalMs = + tableOptions + .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) + .toMillis(); + + return new ChangelogFlinkTableSource( + TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), + toFlussClientConfig(catalogTableOptions, context.getConfiguration()), + tableOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + isStreamingMode, + startupOptions, + partitionDiscoveryIntervalMs, + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), + catalogTableOptions); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java new file mode 100644 index 0000000000..b74da53d65 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema; +import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; +import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.types.RowType; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** A Flink table source for the $changelog virtual table. */ +public class ChangelogFlinkTableSource implements ScanTableSource { + + private static final String CHANGE_TYPE_COLUMN = "_change_type"; + private static final String LOG_OFFSET_COLUMN = "_log_offset"; + private static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; + + private final TablePath tablePath; + private final Configuration flussConfig; + // The changelog output type (includes metadata columns: _change_type, _log_offset, + // _commit_timestamp) + private final org.apache.flink.table.types.logical.RowType changelogOutputType; + private final org.apache.flink.table.types.logical.RowType dataColumnsType; + private final int[] primaryKeyIndexes; + private final int[] bucketKeyIndexes; + private final int[] partitionKeyIndexes; + private final boolean streaming; + private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; + private final long scanPartitionDiscoveryIntervalMs; + private final boolean isDataLakeEnabled; + @Nullable private final MergeEngineType mergeEngineType; + private final Map tableOptions; + + // Projection pushdown + @Nullable private int[] projectedFields; + private LogicalType producedDataType; + + @Nullable private Predicate partitionFilters; + + /** Number of metadata columns prepended to the changelog schema. */ + private static final int NUM_METADATA_COLUMNS = 3; + + public ChangelogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType changelogOutputType, + int[] primaryKeyIndexes, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + boolean isDataLakeEnabled, + @Nullable MergeEngineType mergeEngineType, + Map tableOptions) { + this.tablePath = tablePath; + this.flussConfig = flussConfig; + // The changelogOutputType already includes metadata columns from FlinkCatalog + this.changelogOutputType = changelogOutputType; + this.primaryKeyIndexes = primaryKeyIndexes; + this.bucketKeyIndexes = bucketKeyIndexes; + this.partitionKeyIndexes = partitionKeyIndexes; + this.streaming = streaming; + this.startupOptions = startupOptions; + this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.isDataLakeEnabled = isDataLakeEnabled; + this.mergeEngineType = mergeEngineType; + this.tableOptions = tableOptions; + + // Extract data columns by removing the first 3 metadata columns + this.dataColumnsType = extractDataColumnsType(changelogOutputType); + this.producedDataType = changelogOutputType; + } + + /** + * Extracts the data columns type by removing the metadata columns from the changelog output + * type. + */ + private org.apache.flink.table.types.logical.RowType extractDataColumnsType( + org.apache.flink.table.types.logical.RowType changelogType) { + List allFields = + changelogType.getFields(); + + // Skip the first NUM_METADATA_COLUMNS fields (metadata columns) + List dataFields = + allFields.subList(NUM_METADATA_COLUMNS, allFields.size()); + + return new org.apache.flink.table.types.logical.RowType(new ArrayList<>(dataFields)); + } + + @Override + public ChangelogMode getChangelogMode() { + // The $changelog virtual table always produces INSERT-only records. + // All change types (+I, -U, +U, -D) are flattened into regular rows + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + // Create the Fluss row type for the data columns (without metadata) + RowType flussRowType = FlinkConversions.toFlussRowType(dataColumnsType); + if (projectedFields != null) { + // Adjust projection to account for metadata columns + // TODO: Handle projection properly with metadata columns + flussRowType = flussRowType.project(projectedFields); + } + // to capture all change types (+I, -U, +U, -D). + // FULL mode reads snapshot first (no change types), so we use EARLIEST for log-only + // reading. + // LATEST mode is supported for real-time changelog streaming from current position. + OffsetsInitializer offsetsInitializer; + switch (startupOptions.startupMode) { + case EARLIEST: + case FULL: + // For changelog, FULL mode should read all log records from beginning + offsetsInitializer = OffsetsInitializer.earliest(); + break; + case LATEST: + offsetsInitializer = OffsetsInitializer.latest(); + break; + case TIMESTAMP: + offsetsInitializer = + OffsetsInitializer.timestamp(startupOptions.startupTimestampMs); + break; + default: + throw new IllegalArgumentException( + "Unsupported startup mode: " + startupOptions.startupMode); + } + + // Create the source with the changelog deserialization schema + FlinkSource source = + new FlinkSource<>( + flussConfig, + tablePath, + hasPrimaryKey(), + isPartitioned(), + flussRowType, + projectedFields, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + new ChangelogDeserializationSchema(dataColumnsType), + streaming, + partitionFilters, + null); // Lake source not supported + + if (!streaming) { + // Batch mode + return new SourceProvider() { + @Override + public boolean isBounded() { + return true; + } + + @Override + public Source createSource() { + if (!isDataLakeEnabled) { + throw new UnsupportedOperationException( + "Changelog virtual table requires data lake to be enabled for batch queries"); + } + return source; + } + }; + } else { + return SourceProvider.of(source); + } + } + + @Override + public DynamicTableSource copy() { + ChangelogFlinkTableSource copy = + new ChangelogFlinkTableSource( + tablePath, + flussConfig, + changelogOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + isDataLakeEnabled, + mergeEngineType, + tableOptions); + copy.producedDataType = producedDataType; + copy.projectedFields = projectedFields; + copy.partitionFilters = partitionFilters; + return copy; + } + + @Override + public String asSummaryString() { + return "ChangelogFlinkTableSource"; + } + + // TODO: Implement projection pushdown handling for metadata columns + // TODO: Implement filter pushdown + + private boolean hasPrimaryKey() { + return primaryKeyIndexes.length > 0; + } + + private boolean isPartitioned() { + return partitionKeyIndexes.length > 0; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java new file mode 100644 index 0000000000..f307e95a8b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.deserializer; + +import org.apache.fluss.flink.utils.ChangelogRowConverter; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.types.RowType; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType; + +/** + * A deserialization schema that converts {@link LogRecord} objects to Flink's {@link RowData} + * format with additional changelog metadata columns. + */ +public class ChangelogDeserializationSchema implements FlussDeserializationSchema { + private static final long serialVersionUID = 1L; + + private final org.apache.flink.table.types.logical.RowType originalTableType; + + /** + * Converter responsible for transforming Fluss row data into Flink's {@link RowData} format + * with metadata columns. Initialized during {@link #open(InitializationContext)}. + */ + private transient ChangelogRowConverter converter; + + /** Creates a new ChangelogDeserializationSchema. */ + public ChangelogDeserializationSchema( + org.apache.flink.table.types.logical.RowType originalTableType) { + this.originalTableType = originalTableType; + } + + /** Initializes the deserialization schema. */ + @Override + public void open(InitializationContext context) throws Exception { + if (converter == null) { + this.converter = new ChangelogRowConverter(context.getRowSchema()); + } + } + + /** + * Deserializes a {@link LogRecord} into a Flink {@link RowData} object with metadata columns. + */ + @Override + public RowData deserialize(LogRecord record) throws Exception { + if (converter == null) { + throw new IllegalStateException( + "Converter not initialized. The open() method must be called before deserializing records."); + } + return converter.toChangelogRowData(record); + } + + /** + * Returns the TypeInformation for the produced {@link RowData} type including metadata columns. + */ + @Override + public TypeInformation getProducedType(RowType rowSchema) { + // Build the output type with metadata columns + org.apache.flink.table.types.logical.RowType outputType = + ChangelogRowConverter.buildChangelogRowType(toFlinkRowType(rowSchema)); + return InternalTypeInfo.of(outputType); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java new file mode 100644 index 0000000000..a79e168ced --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.RowKind; + +import java.util.ArrayList; +import java.util.List; + +/** + * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} with additional + * metadata columns for the $changelog virtual table. + */ +public class ChangelogRowConverter implements RecordToFlinkRowConverter { + + private static final long serialVersionUID = 1L; + + private final FlussRowToFlinkRowConverter baseConverter; + private final RowType flussRowType; + private final org.apache.flink.table.types.logical.RowType producedType; + + /** Creates a new ChangelogRowConverter. */ + public ChangelogRowConverter(RowType rowType) { + this.flussRowType = rowType; + this.baseConverter = new FlussRowToFlinkRowConverter(rowType); + this.producedType = buildChangelogRowType(FlinkConversions.toFlinkRowType(rowType)); + } + + /** Converts a LogRecord to a Flink RowData with metadata columns. */ + public RowData toChangelogRowData(LogRecord record) { + + RowData physicalRowData = baseConverter.toFlinkRowData(record); + + // Create metadata row with 3 fields + GenericRowData metadataRow = new GenericRowData(3); + + // 1. _change_type + String changeTypeStr = convertChangeTypeToString(record.getChangeType()); + metadataRow.setField(0, StringData.fromString(changeTypeStr)); + + // 2. _log_offset + metadataRow.setField(1, record.logOffset()); + + // 3. _commit_timestamp (convert long to TimestampData) + metadataRow.setField(2, TimestampData.fromEpochMillis(record.timestamp())); + + // Use JoinedRowData to efficiently combine metadata and physical rows + JoinedRowData joinedRow = new JoinedRowData(metadataRow, physicalRowData); + joinedRow.setRowKind(RowKind.INSERT); + + return joinedRow; + } + + @Override + public RowData convert(LogRecord record) { + return toChangelogRowData(record); + } + + @Override + public org.apache.flink.table.types.logical.RowType getProducedType() { + return producedType; + } + + /** Converts a Fluss ChangeType to its string representation for the changelog virtual table. */ + private String convertChangeTypeToString(ChangeType changeType) { + // Use the short string representation from ChangeType + return changeType.shortString(); + } + + /** + * Builds the Flink RowType for the changelog virtual table by adding metadata columns. + * + * @param originalType The original table's row type + * @return The row type with metadata columns prepended + */ + public static org.apache.flink.table.types.logical.RowType buildChangelogRowType( + org.apache.flink.table.types.logical.RowType originalType) { + List fields = new ArrayList<>(); + + // Add metadata columns first (using centralized constants from TableDescriptor) + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.CHANGELOG_CHANGE_TYPE_COLUMN, new VarCharType(false, 2))); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.CHANGELOG_LOG_OFFSET_COLUMN, new BigIntType(false))); + fields.add( + new org.apache.flink.table.types.logical.RowType.RowField( + TableDescriptor.CHANGELOG_COMMIT_TIMESTAMP_COLUMN, + new TimestampType(false, 3))); + + // Add all original fields + fields.addAll(originalType.getFields()); + + return new org.apache.flink.table.types.logical.RowType(fields); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java new file mode 100644 index 0000000000..e3e0b6d2bf --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.data.RowData; + +import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType; + +/** + * Standard converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} without + * any additional metadata columns. + * + *

This is the default converter used for regular (non-virtual) tables. + */ +public class PlainRowConverter implements RecordToFlinkRowConverter { + + private static final long serialVersionUID = 1L; + + private final FlussRowToFlinkRowConverter baseConverter; + private final org.apache.flink.table.types.logical.RowType flinkRowType; + + /** Creates a new PlainRowConverter. */ + public PlainRowConverter(RowType rowType) { + this.baseConverter = new FlussRowToFlinkRowConverter(rowType); + this.flinkRowType = toFlinkRowType(rowType); + } + + @Override + public RowData convert(LogRecord record) { + return baseConverter.toFlinkRowData(record); + } + + @Override + public org.apache.flink.table.types.logical.RowType getProducedType() { + return flinkRowType; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java new file mode 100644 index 0000000000..ec561a5a5b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.record.LogRecord; + +import org.apache.flink.table.data.RowData; + +import java.io.Serializable; + +/** Interface for converting Fluss {@link LogRecord} to Flink {@link RowData}. */ +public interface RecordToFlinkRowConverter extends Serializable { + + /** Converts a LogRecord to a Flink RowData. */ + RowData convert(LogRecord record); + + /** Gets the output Flink row type produced by this converter. */ + org.apache.flink.table.types.logical.RowType getProducedType(); +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..5ed3864eba --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; +import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Integration test for $changelog virtual table functionality. */ +abstract class ChangelogVirtualTableITCase extends AbstractTestBase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf(new Configuration()) + .setNumOfTabletServers(1) + .build(); + + static final String CATALOG_NAME = "testcatalog"; + static final String DEFAULT_DB = "test_changelog_db"; + protected StreamExecutionEnvironment execEnv; + protected StreamTableEnvironment tEnv; + protected static Connection conn; + protected static Admin admin; + + protected static Configuration clientConf; + + @BeforeAll + protected static void beforeAll() { + clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + + @BeforeEach + void before() { + // Initialize Flink environment + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + + // Initialize catalog and database + String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.executeSql("use catalog " + CATALOG_NAME); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.useDatabase(DEFAULT_DB); + } + + @AfterEach + void after() { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + } + + /** Deletes rows from a primary key table using the proper delete API. */ + protected static void deleteRows( + Connection connection, TablePath tablePath, List rows) throws Exception { + try (Table table = connection.getTable(tablePath)) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (InternalRow row : rows) { + writer.delete(row); + } + writer.flush(); + } + } + + @Test + public void testChangelogVirtualTableWithPrimaryKeyTable() throws Exception { + // Create a primary key table + tEnv.executeSql( + "CREATE TABLE orders (" + + " order_id INT NOT NULL," + + " product_name STRING," + + " amount BIGINT," + + " PRIMARY KEY (order_id) NOT ENFORCED" + + ")"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "orders"); + + // Insert initial data + List initialRows = + Arrays.asList( + row(1, "Product A", 100L), + row(2, "Product B", 200L), + row(3, "Product C", 300L)); + writeRows(conn, tablePath, initialRows, false); + + // Query the changelog virtual table + String query = "SELECT * FROM orders$changelog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Collect initial inserts (don't close iterator - we need it for more batches) + List results = new ArrayList<>(); + List batch1 = collectRowsWithTimeout(rowIter, 3, false); + results.addAll(batch1); + + // Verify initial data has INSERT change type + for (String result : batch1) { + // Result format: +I[change_type, offset, timestamp, order_id, product_name, amount] + assertThat(result).startsWith("+I[+I,"); + } + + // Update some records + List updateRows = + Arrays.asList(row(1, "Product A Updated", 150L), row(2, "Product B Updated", 250L)); + writeRows(conn, tablePath, updateRows, false); + + // Collect updates (don't close iterator yet) + List batch2 = collectRowsWithTimeout(rowIter, 4, false); + results.addAll(batch2); + + // Verify we see UPDATE_BEFORE (-U) and UPDATE_AFTER (+U) records + long updateBeforeCount = batch2.stream().filter(r -> r.contains("[-U,")).count(); + long updateAfterCount = batch2.stream().filter(r -> r.contains("[+U,")).count(); + assertThat(updateBeforeCount).isEqualTo(2); + assertThat(updateAfterCount).isEqualTo(2); + + // Delete a record using the proper delete API + // Note: delete() expects the full row with actual values, not nulls + deleteRows(conn, tablePath, Arrays.asList(row(3, "Product C", 300L))); + + // Collect delete (close iterator after this) + List batch3 = collectRowsWithTimeout(rowIter, 1, true); + results.addAll(batch3); + + // Verify we see DELETE (-D) record + // Note: Fluss DELETE operation produces ChangeType.DELETE which maps to "-D" + // The test verifies that a delete record is captured in the changelog + assertThat(batch3.get(0)).contains("3"); // The deleted row ID should be present + + // Verify metadata columns are present in all records + for (String result : results) { + // Each row should have: change_type, log_offset, timestamp, then original columns + String[] parts = result.substring(3, result.length() - 1).split(", "); + assertThat(parts.length).isGreaterThanOrEqualTo(6); // 3 metadata + 3 data columns + } + } + + @Test + public void testChangelogVirtualTableSchemaIntrospection() throws Exception { + // Create a primary key table + tEnv.executeSql( + "CREATE TABLE products (" + + " id INT NOT NULL," + + " name STRING," + + " price DECIMAL(10, 2)," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Test DESCRIBE on changelog virtual table + CloseableIterator describeResult = + tEnv.executeSql("DESCRIBE products$changelog").collect(); + + List schemaRows = new ArrayList<>(); + while (describeResult.hasNext()) { + schemaRows.add(describeResult.next().toString()); + } + + // Verify metadata columns are listed first + // Format: +I[column_name, type, nullable (true/false), ...] + assertThat(schemaRows.get(0)).contains("_change_type"); + assertThat(schemaRows.get(0)).contains("STRING"); + // Flink DESCRIBE shows nullability as 'false' for NOT NULL columns + assertThat(schemaRows.get(0)).contains("false"); + + assertThat(schemaRows.get(1)).contains("_log_offset"); + assertThat(schemaRows.get(1)).contains("BIGINT"); + assertThat(schemaRows.get(1)).contains("false"); + + assertThat(schemaRows.get(2)).contains("_commit_timestamp"); + assertThat(schemaRows.get(2)).contains("TIMESTAMP"); + assertThat(schemaRows.get(2)).contains("false"); + + // Verify original columns follow + assertThat(schemaRows.get(3)).contains("id"); + assertThat(schemaRows.get(4)).contains("name"); + assertThat(schemaRows.get(5)).contains("price"); + } + + @Test + public void testChangelogVirtualTableWithNonPrimaryKeyTable() { + // Create a non-primary key table (log table) + tEnv.executeSql( + "CREATE TABLE events (" + + " event_id INT," + + " event_type STRING," + + " event_time TIMESTAMP" + + ")"); + + // Attempt to query changelog virtual table should fail + String query = "SELECT * FROM events$changelog"; + + // The error message is wrapped in a CatalogException, so we check for the root cause + assertThatThrownBy(() -> tEnv.executeSql(query).await()) + .hasRootCauseMessage( + "Virtual $changelog tables are only supported for primary key tables. " + + "Table test_changelog_db.events does not have a primary key."); + } + + @Test + public void testAllChangeTypes() throws Exception { + // Create a primary key table + // Note: Using `val` instead of `value` as `value` is a reserved keyword in Flink SQL + tEnv.executeSql( + "CREATE TABLE test_changes (" + + " id INT NOT NULL," + + " val STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_changes"); + + String query = "SELECT _change_type, id, val FROM test_changes$changelog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Test INSERT (+I) + writeRows(conn, tablePath, Arrays.asList(row(1, "initial")), false); + List insertResult = collectRowsWithTimeout(rowIter, 1, false); + assertThat(insertResult.get(0)).startsWith("+I[+I, 1, initial]"); + + // Test UPDATE (-U/+U) + writeRows(conn, tablePath, Arrays.asList(row(1, "updated")), false); + List updateResults = collectRowsWithTimeout(rowIter, 2, false); + assertThat(updateResults.get(0)).startsWith("+I[-U, 1, initial]"); + assertThat(updateResults.get(1)).startsWith("+I[+U, 1, updated]"); + + // Test DELETE operation using the proper delete API + deleteRows(conn, tablePath, Arrays.asList(row(1, "updated"))); + List deleteResult = collectRowsWithTimeout(rowIter, 1, true); + // Verify the delete record contains the row data and has DELETE change type (-D) + // DELETE produces ChangeType.DELETE which maps to "-D" in the changelog + assertThat(deleteResult.get(0)).startsWith("+I[-D, 1, updated]"); + } + + @Test + public void testChangelogVirtualTableConcurrentChanges() throws Exception { + // Create a primary key table + tEnv.executeSql( + "CREATE TABLE concurrent_test (" + + " id INT NOT NULL," + + " counter INT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "concurrent_test"); + + // Start collecting from changelog + String query = "SELECT _change_type, id, counter FROM concurrent_test$changelog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Perform multiple concurrent-like changes + for (int i = 1; i <= 5; i++) { + writeRows(conn, tablePath, Arrays.asList(row(i, i * 10)), false); + } + + // Collect all inserts (don't close iterator - we need it for updates) + List results = collectRowsWithTimeout(rowIter, 5, false); + + // Verify all are inserts + for (String result : results) { + assertThat(result).startsWith("+I[+I,"); + } + + // Update all records + for (int i = 1; i <= 5; i++) { + writeRows(conn, tablePath, Arrays.asList(row(i, i * 20)), false); + } + + // Collect all updates (5 * 2 = 10 records: before and after for each) + // Now we can close the iterator + results = collectRowsWithTimeout(rowIter, 10, true); + + // Verify we have equal number of -U and +U + long updateBeforeCount = results.stream().filter(r -> r.contains("[-U,")).count(); + long updateAfterCount = results.stream().filter(r -> r.contains("[+U,")).count(); + assertThat(updateBeforeCount).isEqualTo(5); + assertThat(updateAfterCount).isEqualTo(5); + } + + @Test + public void testChangelogVirtualTableWithComplexSchema() throws Exception { + // Create a table with various data types + tEnv.executeSql( + "CREATE TABLE complex_table (" + + " id INT NOT NULL," + + " name STRING," + + " score DOUBLE," + + " is_active BOOLEAN," + + " created_date DATE," + + " metadata MAP," + + " tags ARRAY," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Verify the schema includes metadata columns + CloseableIterator describeResult = + tEnv.executeSql("DESCRIBE complex_table$changelog").collect(); + + List schemaRows = new ArrayList<>(); + while (describeResult.hasNext()) { + schemaRows.add(describeResult.next().toString()); + } + + // Should have 3 metadata columns + 7 data columns = 10 total + assertThat(schemaRows).hasSize(10); + + // Verify metadata columns + assertThat(schemaRows.get(0)).contains("_change_type"); + assertThat(schemaRows.get(1)).contains("_log_offset"); + assertThat(schemaRows.get(2)).contains("_commit_timestamp"); + + // Verify data columns maintain their types + assertThat(schemaRows.get(3)).contains("id"); + assertThat(schemaRows.get(4)).contains("name"); + assertThat(schemaRows.get(5)).contains("score"); + assertThat(schemaRows.get(6)).contains("is_active"); + assertThat(schemaRows.get(7)).contains("created_date"); + assertThat(schemaRows.get(8)).contains("metadata"); + assertThat(schemaRows.get(9)).contains("tags"); + } + + @Test + public void testBasicChangelogScanWithMetadataValidation() throws Exception { + // Create a primary key table + // Note: Avoiding `value` as it's a reserved keyword in Flink SQL + tEnv.executeSql( + "CREATE TABLE scan_test (" + + " id INT NOT NULL," + + " name STRING," + + " amount BIGINT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "scan_test"); + + // Start changelog scan + String query = "SELECT * FROM scan_test$changelog"; + CloseableIterator rowIter = tEnv.executeSql(query).collect(); + + // Insert initial data + List initialData = + Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)); + writeRows(conn, tablePath, initialData, false); + + // Collect and validate inserts (don't close iterator - we need it for update/delete tests) + List results = collectRowsWithTimeout(rowIter, 2, false); + + // Validate that we received 2 INSERT records + assertThat(results).hasSize(2); + + // Validate metadata columns are present and correctly formatted + for (String result : results) { + // Parse the row to validate structure + String[] parts = result.substring(3, result.length() - 1).split(", ", 6); + + // Validate change type column + assertThat(parts[0]).isEqualTo("+I"); + + // Validate log offset column (should be a valid long) + assertThat(Long.parseLong(parts[1])).isGreaterThanOrEqualTo(0); + + // Validate timestamp column exists (we can't predict exact value) + assertThat(parts[2]).isNotEmpty(); + + // Validate data columns follow metadata + int id = Integer.parseInt(parts[3]); + assertThat(id).isIn(1, 2); + assertThat(parts[4]).isIn("Item-1", "Item-2"); + assertThat(Long.parseLong(parts[5])).isIn(100L, 200L); + } + + // Test an update operation + writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L)), false); + + // Collect update records (should get -U and +U) + List updateResults = collectRowsWithTimeout(rowIter, 2, false); + assertThat(updateResults).hasSize(2); + + // Validate UPDATE_BEFORE record + assertThat(updateResults.get(0)).contains("-U"); + assertThat(updateResults.get(0)).contains("Item-1"); + assertThat(updateResults.get(0)).contains("100"); + + // Validate UPDATE_AFTER record + assertThat(updateResults.get(1)).contains("+U"); + assertThat(updateResults.get(1)).contains("Item-1-Updated"); + assertThat(updateResults.get(1)).contains("150"); + + // Test delete operation using the proper delete API + deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L))); + + // Collect delete record + List deleteResult = collectRowsWithTimeout(rowIter, 1, true); + assertThat(deleteResult).hasSize(1); + // Verify the delete record contains the row data (the change type may be -D or -U) + assertThat(deleteResult.get(0)).contains("2"); + assertThat(deleteResult.get(0)).contains("Item-2"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java new file mode 100644 index 0000000000..594903240a --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.GenericRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.row.indexed.IndexedRowWriter; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link ChangelogRowConverter}. */ +class ChangelogRowConverterTest { + + private RowType testRowType; + private ChangelogRowConverter converter; + + @BeforeEach + void setUp() { + // Create a simple test table schema: (id INT, name STRING, amount BIGINT) + testRowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("amount", DataTypes.BIGINT()) + .build(); + + converter = new ChangelogRowConverter(testRowType); + } + + @Test + void testConvertInsertRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1, "Alice", 5000L); + + RowData result = converter.convert(record); + + // Verify row kind + assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); + + // Verify metadata columns + assertThat(result.getString(0)).isEqualTo(StringData.fromString("+I")); + assertThat(result.getLong(1)).isEqualTo(100L); // log offset + assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp + + // Verify physical columns + assertThat(result.getInt(3)).isEqualTo(1); // id + assertThat(result.getString(4).toString()).isEqualTo("Alice"); // name + assertThat(result.getLong(5)).isEqualTo(5000L); // amount + + // Verify it's a JoinedRowData + assertThat(result).isInstanceOf(JoinedRowData.class); + } + + @Test + void testConvertUpdateBeforeRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2, "Bob", 3000L); + + RowData result = converter.convert(record); + + // Verify row kind (always INSERT for virtual table) + assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT); + + // Verify change type metadata + assertThat(result.getString(0)).isEqualTo(StringData.fromString("-U")); + assertThat(result.getLong(1)).isEqualTo(200L); + + // Verify physical columns + assertThat(result.getInt(3)).isEqualTo(2); + assertThat(result.getString(4).toString()).isEqualTo("Bob"); + assertThat(result.getLong(5)).isEqualTo(3000L); + } + + @Test + void testConvertUpdateAfterRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2, "Bob", 4000L); + + RowData result = converter.convert(record); + + assertThat(result.getString(0)).isEqualTo(StringData.fromString("+U")); + assertThat(result.getLong(1)).isEqualTo(201L); + assertThat(result.getInt(3)).isEqualTo(2); + assertThat(result.getString(4).toString()).isEqualTo("Bob"); + assertThat(result.getLong(5)).isEqualTo(4000L); + } + + @Test + void testConvertDeleteRecord() throws Exception { + LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3, "Charlie", 1000L); + + RowData result = converter.convert(record); + + assertThat(result.getString(0)).isEqualTo(StringData.fromString("-D")); + assertThat(result.getLong(1)).isEqualTo(300L); + assertThat(result.getInt(3)).isEqualTo(3); + assertThat(result.getString(4).toString()).isEqualTo("Charlie"); + assertThat(result.getLong(5)).isEqualTo(1000L); + } + + @Test + void testProducedTypeHasMetadataColumns() { + org.apache.flink.table.types.logical.RowType producedType = converter.getProducedType(); + + // Should have 3 metadata columns + 3 physical columns + assertThat(producedType.getFieldCount()).isEqualTo(6); + + // Check metadata column names and types + assertThat(producedType.getFieldNames()) + .containsExactly( + "_change_type", "_log_offset", "_commit_timestamp", "id", "name", "amount"); + + // Check metadata column types + assertThat(producedType.getTypeAt(0)) + .isInstanceOf(org.apache.flink.table.types.logical.VarCharType.class); + assertThat(producedType.getTypeAt(1)) + .isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class); + assertThat(producedType.getTypeAt(2)) + .isInstanceOf(org.apache.flink.table.types.logical.TimestampType.class); + } + + @Test + void testAllChangeTypes() throws Exception { + // Test all change type conversions + assertThat( + converter + .convert(createLogRecord(ChangeType.INSERT, 1L, 1, "Test", 100L)) + .getString(0)) + .isEqualTo(StringData.fromString("+I")); + + assertThat( + converter + .convert( + createLogRecord( + ChangeType.UPDATE_BEFORE, 2L, 1, "Test", 100L)) + .getString(0)) + .isEqualTo(StringData.fromString("-U")); + + assertThat( + converter + .convert( + createLogRecord( + ChangeType.UPDATE_AFTER, 3L, 1, "Test", 100L)) + .getString(0)) + .isEqualTo(StringData.fromString("+U")); + + assertThat( + converter + .convert(createLogRecord(ChangeType.DELETE, 4L, 1, "Test", 100L)) + .getString(0)) + .isEqualTo(StringData.fromString("-D")); + + // For log tables (append-only) + assertThat( + converter + .convert( + createLogRecord( + ChangeType.APPEND_ONLY, 5L, 1, "Test", 100L)) + .getString(0)) + .isEqualTo(StringData.fromString("+A")); + } + + private LogRecord createLogRecord( + ChangeType changeType, long offset, int id, String name, long amount) throws Exception { + // Create an IndexedRow with test data + IndexedRow row = new IndexedRow(testRowType.getChildren().toArray(new DataType[0])); + try (IndexedRowWriter writer = + new IndexedRowWriter(testRowType.getChildren().toArray(new DataType[0]))) { + writer.writeInt(id); + writer.writeString(BinaryString.fromString(name)); + writer.writeLong(amount); + writer.complete(); + + row.pointTo(writer.segment(), 0, writer.position()); + + return new GenericRecord( + offset, // log offset + System.currentTimeMillis(), // timestamp + changeType, // change type + row // row data + ); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 8aedc194b5..cb2afdd935 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -53,6 +53,9 @@ import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_CHANGE_TYPE_COLUMN; +import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_COMMIT_TIMESTAMP_COLUMN; +import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_LOG_OFFSET_COLUMN; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.PartitionUtils.PARTITION_KEY_SUPPORTED_TYPES; @@ -66,7 +69,10 @@ public class TableDescriptorValidation { Arrays.asList( OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME, - BUCKET_COLUMN_NAME))); + BUCKET_COLUMN_NAME, + CHANGELOG_CHANGE_TYPE_COLUMN, + CHANGELOG_LOG_OFFSET_COLUMN, + CHANGELOG_COMMIT_TIMESTAMP_COLUMN))); private static final List KEY_UNSUPPORTED_TYPES = Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW); From 376d70f6de619122ccb9a1fba7863bf79b00c845 Mon Sep 17 00:00:00 2001 From: Mehul Batra Date: Thu, 22 Jan 2026 14:59:25 +0530 Subject: [PATCH 2/6] address Jark comments --- .gitignore | 4 +- .../fluss/client/admin/FlussAdminITCase.java | 27 ++ .../fluss/metadata/TableDescriptor.java | 8 +- .../Flink118ChangelogVirtualTableITCase.java | 21 + .../Flink119ChangelogVirtualTableITCase.java | 21 + .../Flink22ChangelogVirtualTableITCase.java | 21 + .../fluss/flink/catalog/FlinkCatalog.java | 43 +- .../flink/catalog/FlinkTableFactory.java | 27 +- .../source/ChangelogFlinkTableSource.java | 50 +- .../ChangelogDeserializationSchema.java | 8 +- .../flink/utils/ChangelogRowConverter.java | 17 +- .../fluss/flink/utils/PlainRowConverter.java | 55 --- .../flink/catalog/FlinkCatalogITCase.java | 47 ++ .../source/ChangelogVirtualTableITCase.java | 434 ++++++++---------- .../utils/TableDescriptorValidation.java | 12 +- 15 files changed, 406 insertions(+), 389 deletions(-) create mode 100644 fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java create mode 100644 fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java delete mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java diff --git a/.gitignore b/.gitignore index 51b585ba2b..845fa7ff97 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ dependency-reduced-pom.xml ### VS Code ### .vscode/ +### claude code ### +.claude/ + ### Mac OS ### .DS_Store @@ -42,4 +45,3 @@ website/versioned_docs website/versioned_sidebars website/versions.json website/pnpm-lock.yaml -/.claude/ diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 64cd85e9f8..5560e14eea 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1495,6 +1495,33 @@ public void testSystemsColumns() throws Exception { + "because they are reserved system columns in Fluss. " + "Please use other names for these columns. " + "The reserved system columns are: __offset, __timestamp, __bucket"); + + // Test changelog virtual table metadata columns are also reserved + TableDescriptor changelogColumnsDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.STRING()) + .column("_change_type", DataTypes.STRING()) + .column("_log_offset", DataTypes.BIGINT()) + .column("_commit_timestamp", DataTypes.TIMESTAMP()) + .build()) + .distributedBy(1) + .build(); + + TablePath changelogTablePath = TablePath.of(dbName, "test_changelog_columns"); + + assertThatThrownBy( + () -> + admin.createTable( + changelogTablePath, + changelogColumnsDescriptor, + false) + .get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining( + "_change_type, _log_offset, _commit_timestamp cannot be used as column names"); } @Test diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java index 7f2d22dc36..4c8ecbd79a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java @@ -60,10 +60,10 @@ public final class TableDescriptor implements Serializable { public static final String TIMESTAMP_COLUMN_NAME = "__timestamp"; public static final String BUCKET_COLUMN_NAME = "__bucket"; - // Reserved column names for $changelog virtual table metadata - public static final String CHANGELOG_CHANGE_TYPE_COLUMN = "_change_type"; - public static final String CHANGELOG_LOG_OFFSET_COLUMN = "_log_offset"; - public static final String CHANGELOG_COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; + // Reserved column names for virtual table metadata ($changelog and $binlog) + public static final String CHANGE_TYPE_COLUMN = "_change_type"; + public static final String LOG_OFFSET_COLUMN = "_log_offset"; + public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; private final Schema schema; private final @Nullable String comment; diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..10aa76d70a --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.18. */ +public class Flink118ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..265fb80a29 --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.19. */ +public class Flink119ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java new file mode 100644 index 0000000000..8a193c0bde --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22ChangelogVirtualTableITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for {@link ChangelogVirtualTableITCase} in Flink 2.2. */ +public class Flink22ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 65c244efc1..e30a76476d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -85,6 +85,9 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS; @@ -307,9 +310,11 @@ public CatalogBaseTable getTable(ObjectPath objectPath) String tableName = objectPath.getObjectName(); // Check if this is a virtual table ($changelog or $binlog) - if (tableName.endsWith(CHANGELOG_TABLE_SUFFIX)) { + if (tableName.endsWith(CHANGELOG_TABLE_SUFFIX) + && !tableName.contains(LAKE_TABLE_SPLITTER)) { return getVirtualChangelogTable(objectPath); - } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)) { + } else if (tableName.endsWith(BINLOG_TABLE_SUFFIX) + && !tableName.contains(LAKE_TABLE_SPLITTER)) { // TODO: Implement binlog virtual table in future throw new UnsupportedOperationException( String.format( @@ -925,7 +930,7 @@ private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath) newOptions.putAll(securityConfigs); // Create a new CatalogTable with the modified schema - return CatalogTable.of( + return CatalogTableAdapter.toCatalogTable( changelogSchema, baseTable.getComment(), baseTable.getPartitionKeys(), @@ -949,32 +954,12 @@ private Schema buildChangelogSchema(Schema originalSchema) { Schema.Builder builder = Schema.newBuilder(); // Add metadata columns first - builder.column("_change_type", org.apache.flink.table.api.DataTypes.STRING().notNull()); - builder.column("_log_offset", org.apache.flink.table.api.DataTypes.BIGINT().notNull()); - builder.column( - "_commit_timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(3).notNull()); - - // Add all original columns - for (Schema.UnresolvedColumn column : originalSchema.getColumns()) { - if (column instanceof Schema.UnresolvedPhysicalColumn) { - Schema.UnresolvedPhysicalColumn physicalColumn = - (Schema.UnresolvedPhysicalColumn) column; - builder.column(physicalColumn.getName(), physicalColumn.getDataType()); - } else if (column instanceof Schema.UnresolvedComputedColumn) { - Schema.UnresolvedComputedColumn computedColumn = - (Schema.UnresolvedComputedColumn) column; - builder.columnByExpression( - computedColumn.getName(), computedColumn.getExpression()); - } else if (column instanceof Schema.UnresolvedMetadataColumn) { - Schema.UnresolvedMetadataColumn metadataColumn = - (Schema.UnresolvedMetadataColumn) column; - builder.columnByMetadata( - metadataColumn.getName(), - metadataColumn.getDataType(), - metadataColumn.getMetadataKey(), - metadataColumn.isVirtual()); - } - } + builder.column("_change_type", STRING().notNull()); + builder.column("_log_offset", BIGINT().notNull()); + builder.column("_commit_timestamp", TIMESTAMP_LTZ().notNull()); + + // Add all original columns (preserves all column attributes including comments) + builder.fromColumns(originalSchema.getColumns()); // Note: We don't copy primary keys or watermarks for virtual tables diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 189df7d3b9..c1770ee33c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -98,6 +98,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); + validateSourceOptions(helper, tableOptions); Optional datalakeFormat = getDatalakeFormat(tableOptions); List prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client.", "fields.")); @@ -109,7 +110,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { == RuntimeExecutionMode.STREAMING; RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); - FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); ZoneId timeZone = FlinkConnectorOptionsUtils.getLocalTimeZone( @@ -274,6 +274,21 @@ private LakeTableFactory mayInitLakeTableFactory() { return lakeTableFactory; } + /** + * Validates table source options using the standard validation pattern. + * + * @param helper the factory helper for option validation + * @param tableOptions the table options to validate + */ + private static void validateSourceOptions( + FactoryUtil.TableFactoryHelper helper, ReadableConfig tableOptions) { + Optional datalakeFormat = getDatalakeFormat(tableOptions); + List prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client.")); + datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + ".")); + helper.validateExcept(prefixesToSkip.toArray(new String[0])); + FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); + } + /** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */ private DynamicTableSource createChangelogTableSource( Context context, ObjectIdentifier tableIdentifier, String tableName) { @@ -301,6 +316,7 @@ private DynamicTableSource createChangelogTableSource( Map catalogTableOptions = context.getCatalogTable().getOptions(); FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); + validateSourceOptions(helper, tableOptions); ZoneId timeZone = FlinkConnectorOptionsUtils.getLocalTimeZone( @@ -310,11 +326,12 @@ private DynamicTableSource createChangelogTableSource( ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); - // bucket.key is set from the base table's primary key int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, dataColumnsType); - // For changelog virtual tables, if bucket key is present, base table has a PK - int[] primaryKeyIndexes = bucketKeyIndexes; + // Changelog/binlog virtual tables are purely log-based and don't have a primary key. + // Setting primaryKeyIndexes to empty ensures the enumerator fetches log-only splits + // (not snapshot splits), which is the correct behavior for virtual tables. + int[] primaryKeyIndexes = new int[0]; // Partition key indexes based on data columns int[] partitionKeyIndexes = @@ -337,8 +354,6 @@ private DynamicTableSource createChangelogTableSource( isStreamingMode, startupOptions, partitionDiscoveryIntervalMs, - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), catalogTableOptions); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index b74da53d65..c8e47dcd5f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -22,7 +22,7 @@ import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; -import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; @@ -37,17 +37,16 @@ import javax.annotation.Nullable; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** A Flink table source for the $changelog virtual table. */ public class ChangelogFlinkTableSource implements ScanTableSource { - private static final String CHANGE_TYPE_COLUMN = "_change_type"; - private static final String LOG_OFFSET_COLUMN = "_log_offset"; - private static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp"; - private final TablePath tablePath; private final Configuration flussConfig; // The changelog output type (includes metadata columns: _change_type, _log_offset, @@ -60,8 +59,6 @@ public class ChangelogFlinkTableSource implements ScanTableSource { private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; private final long scanPartitionDiscoveryIntervalMs; - private final boolean isDataLakeEnabled; - @Nullable private final MergeEngineType mergeEngineType; private final Map tableOptions; // Projection pushdown @@ -70,8 +67,12 @@ public class ChangelogFlinkTableSource implements ScanTableSource { @Nullable private Predicate partitionFilters; - /** Number of metadata columns prepended to the changelog schema. */ - private static final int NUM_METADATA_COLUMNS = 3; + private static final Set METADATA_COLUMN_NAMES = + new HashSet<>( + Arrays.asList( + TableDescriptor.CHANGE_TYPE_COLUMN, + TableDescriptor.LOG_OFFSET_COLUMN, + TableDescriptor.COMMIT_TIMESTAMP_COLUMN)); public ChangelogFlinkTableSource( TablePath tablePath, @@ -83,8 +84,6 @@ public ChangelogFlinkTableSource( boolean streaming, FlinkConnectorOptionsUtils.StartupOptions startupOptions, long scanPartitionDiscoveryIntervalMs, - boolean isDataLakeEnabled, - @Nullable MergeEngineType mergeEngineType, Map tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; @@ -96,11 +95,9 @@ public ChangelogFlinkTableSource( this.streaming = streaming; this.startupOptions = startupOptions; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; - this.isDataLakeEnabled = isDataLakeEnabled; - this.mergeEngineType = mergeEngineType; this.tableOptions = tableOptions; - // Extract data columns by removing the first 3 metadata columns + // Extract data columns by filtering out metadata columns by name this.dataColumnsType = extractDataColumnsType(changelogOutputType); this.producedDataType = changelogOutputType; } @@ -111,14 +108,13 @@ public ChangelogFlinkTableSource( */ private org.apache.flink.table.types.logical.RowType extractDataColumnsType( org.apache.flink.table.types.logical.RowType changelogType) { - List allFields = - changelogType.getFields(); - - // Skip the first NUM_METADATA_COLUMNS fields (metadata columns) + // Filter out metadata columns by name List dataFields = - allFields.subList(NUM_METADATA_COLUMNS, allFields.size()); + changelogType.getFields().stream() + .filter(field -> !METADATA_COLUMN_NAMES.contains(field.getName())) + .collect(Collectors.toList()); - return new org.apache.flink.table.types.logical.RowType(new ArrayList<>(dataFields)); + return new org.apache.flink.table.types.logical.RowType(dataFields); } @Override @@ -171,13 +167,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { projectedFields, offsetsInitializer, scanPartitionDiscoveryIntervalMs, - new ChangelogDeserializationSchema(dataColumnsType), + new ChangelogDeserializationSchema(), streaming, partitionFilters, null); // Lake source not supported if (!streaming) { - // Batch mode + // Batch mode - changelog virtual tables read from log, not data lake return new SourceProvider() { @Override public boolean isBounded() { @@ -186,10 +182,6 @@ public boolean isBounded() { @Override public Source createSource() { - if (!isDataLakeEnabled) { - throw new UnsupportedOperationException( - "Changelog virtual table requires data lake to be enabled for batch queries"); - } return source; } }; @@ -211,8 +203,6 @@ public DynamicTableSource copy() { streaming, startupOptions, scanPartitionDiscoveryIntervalMs, - isDataLakeEnabled, - mergeEngineType, tableOptions); copy.producedDataType = producedDataType; copy.projectedFields = projectedFields; @@ -222,7 +212,7 @@ public DynamicTableSource copy() { @Override public String asSummaryString() { - return "ChangelogFlinkTableSource"; + return "FlussChangelogTableSource"; } // TODO: Implement projection pushdown handling for metadata columns diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java index f307e95a8b..c81141d345 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/ChangelogDeserializationSchema.java @@ -32,9 +32,6 @@ * format with additional changelog metadata columns. */ public class ChangelogDeserializationSchema implements FlussDeserializationSchema { - private static final long serialVersionUID = 1L; - - private final org.apache.flink.table.types.logical.RowType originalTableType; /** * Converter responsible for transforming Fluss row data into Flink's {@link RowData} format @@ -43,10 +40,7 @@ public class ChangelogDeserializationSchema implements FlussDeserializationSchem private transient ChangelogRowConverter converter; /** Creates a new ChangelogDeserializationSchema. */ - public ChangelogDeserializationSchema( - org.apache.flink.table.types.logical.RowType originalTableType) { - this.originalTableType = originalTableType; - } + public ChangelogDeserializationSchema() {} /** Initializes the deserialization schema. */ @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java index a79e168ced..d9d6b1951a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java @@ -28,7 +28,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.RowKind; @@ -41,23 +41,18 @@ */ public class ChangelogRowConverter implements RecordToFlinkRowConverter { - private static final long serialVersionUID = 1L; - private final FlussRowToFlinkRowConverter baseConverter; - private final RowType flussRowType; private final org.apache.flink.table.types.logical.RowType producedType; /** Creates a new ChangelogRowConverter. */ public ChangelogRowConverter(RowType rowType) { - this.flussRowType = rowType; this.baseConverter = new FlussRowToFlinkRowConverter(rowType); this.producedType = buildChangelogRowType(FlinkConversions.toFlinkRowType(rowType)); } /** Converts a LogRecord to a Flink RowData with metadata columns. */ public RowData toChangelogRowData(LogRecord record) { - - RowData physicalRowData = baseConverter.toFlinkRowData(record); + RowData physicalRowData = baseConverter.toFlinkRowData(record.getRow()); // Create metadata row with 3 fields GenericRowData metadataRow = new GenericRowData(3); @@ -108,14 +103,14 @@ public static org.apache.flink.table.types.logical.RowType buildChangelogRowType // Add metadata columns first (using centralized constants from TableDescriptor) fields.add( new org.apache.flink.table.types.logical.RowType.RowField( - TableDescriptor.CHANGELOG_CHANGE_TYPE_COLUMN, new VarCharType(false, 2))); + TableDescriptor.CHANGE_TYPE_COLUMN, new VarCharType(false, 2))); fields.add( new org.apache.flink.table.types.logical.RowType.RowField( - TableDescriptor.CHANGELOG_LOG_OFFSET_COLUMN, new BigIntType(false))); + TableDescriptor.LOG_OFFSET_COLUMN, new BigIntType(false))); fields.add( new org.apache.flink.table.types.logical.RowType.RowField( - TableDescriptor.CHANGELOG_COMMIT_TIMESTAMP_COLUMN, - new TimestampType(false, 3))); + TableDescriptor.COMMIT_TIMESTAMP_COLUMN, + new LocalZonedTimestampType(false, 6))); // Add all original fields fields.addAll(originalType.getFields()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java deleted file mode 100644 index e3e0b6d2bf..0000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PlainRowConverter.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.fluss.record.LogRecord; -import org.apache.fluss.types.RowType; - -import org.apache.flink.table.data.RowData; - -import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType; - -/** - * Standard converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} without - * any additional metadata columns. - * - *

This is the default converter used for regular (non-virtual) tables. - */ -public class PlainRowConverter implements RecordToFlinkRowConverter { - - private static final long serialVersionUID = 1L; - - private final FlussRowToFlinkRowConverter baseConverter; - private final org.apache.flink.table.types.logical.RowType flinkRowType; - - /** Creates a new PlainRowConverter. */ - public PlainRowConverter(RowType rowType) { - this.baseConverter = new FlussRowToFlinkRowConverter(rowType); - this.flinkRowType = toFlinkRowType(rowType); - } - - @Override - public RowData convert(LogRecord record) { - return baseConverter.toFlinkRowData(record); - } - - @Override - public org.apache.flink.table.types.logical.RowType getProducedType() { - return flinkRowType; - } -} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index f42ba30ae7..de39d85435 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -885,6 +885,53 @@ void testCreateCatalogWithLakeProperties() throws Exception { .containsEntry("table.datalake.paimon.jdbc.password", "pass"); } + @Test + void testGetChangelogVirtualTable() throws Exception { + // Create a primary key table + tEnv.executeSql( + "CREATE TABLE pk_table_for_changelog (" + + " id INT NOT NULL," + + " name STRING," + + " amount BIGINT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + // Get the $changelog virtual table via catalog API + CatalogTable changelogTable = + (CatalogTable) + catalog.getTable( + new ObjectPath(DEFAULT_DB, "pk_table_for_changelog$changelog")); + + // Build expected schema: metadata columns + original columns + Schema expectedSchema = + Schema.newBuilder() + .column("_change_type", DataTypes.STRING().notNull()) + .column("_log_offset", DataTypes.BIGINT().notNull()) + .column("_commit_timestamp", DataTypes.TIMESTAMP_LTZ().notNull()) + .column("id", DataTypes.INT().notNull()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.BIGINT()) + .build(); + + assertThat(changelogTable.getUnresolvedSchema()).isEqualTo(expectedSchema); + + // Verify options are inherited from base table + assertThat(changelogTable.getOptions()).containsEntry("bucket.num", "1"); + + // Verify $changelog on non-PK table throws appropriate error + tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name STRING)"); + + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + DEFAULT_DB, "log_table_for_changelog$changelog"))) + .isInstanceOf(CatalogException.class) + .hasRootCauseMessage( + "Virtual $changelog tables are only supported for primary key tables. " + + "Table fluss.log_table_for_changelog does not have a primary key."); + } + /** * Before Flink 2.1, the {@link Schema} did not include an index field. Starting from Flink 2.1, * Flink introduced the concept of an index, and in Fluss, the primary key is considered as an diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 5ed3864eba..ab106ea4a2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.utils.clock.ManualClock; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -56,11 +58,14 @@ /** Integration test for $changelog virtual table functionality. */ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { + protected static final ManualClock CLOCK = new ManualClock(); + @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder() .setClusterConf(new Configuration()) .setNumOfTabletServers(1) + .setClock(CLOCK) .build(); static final String CATALOG_NAME = "testcatalog"; @@ -116,116 +121,69 @@ protected static void deleteRows( } @Test - public void testChangelogVirtualTableWithPrimaryKeyTable() throws Exception { - // Create a primary key table - tEnv.executeSql( - "CREATE TABLE orders (" - + " order_id INT NOT NULL," - + " product_name STRING," - + " amount BIGINT," - + " PRIMARY KEY (order_id) NOT ENFORCED" - + ")"); - - TablePath tablePath = TablePath.of(DEFAULT_DB, "orders"); - - // Insert initial data - List initialRows = - Arrays.asList( - row(1, "Product A", 100L), - row(2, "Product B", 200L), - row(3, "Product C", 300L)); - writeRows(conn, tablePath, initialRows, false); - - // Query the changelog virtual table - String query = "SELECT * FROM orders$changelog"; - CloseableIterator rowIter = tEnv.executeSql(query).collect(); - - // Collect initial inserts (don't close iterator - we need it for more batches) - List results = new ArrayList<>(); - List batch1 = collectRowsWithTimeout(rowIter, 3, false); - results.addAll(batch1); - - // Verify initial data has INSERT change type - for (String result : batch1) { - // Result format: +I[change_type, offset, timestamp, order_id, product_name, amount] - assertThat(result).startsWith("+I[+I,"); - } - - // Update some records - List updateRows = - Arrays.asList(row(1, "Product A Updated", 150L), row(2, "Product B Updated", 250L)); - writeRows(conn, tablePath, updateRows, false); - - // Collect updates (don't close iterator yet) - List batch2 = collectRowsWithTimeout(rowIter, 4, false); - results.addAll(batch2); - - // Verify we see UPDATE_BEFORE (-U) and UPDATE_AFTER (+U) records - long updateBeforeCount = batch2.stream().filter(r -> r.contains("[-U,")).count(); - long updateAfterCount = batch2.stream().filter(r -> r.contains("[+U,")).count(); - assertThat(updateBeforeCount).isEqualTo(2); - assertThat(updateAfterCount).isEqualTo(2); - - // Delete a record using the proper delete API - // Note: delete() expects the full row with actual values, not nulls - deleteRows(conn, tablePath, Arrays.asList(row(3, "Product C", 300L))); - - // Collect delete (close iterator after this) - List batch3 = collectRowsWithTimeout(rowIter, 1, true); - results.addAll(batch3); - - // Verify we see DELETE (-D) record - // Note: Fluss DELETE operation produces ChangeType.DELETE which maps to "-D" - // The test verifies that a delete record is captured in the changelog - assertThat(batch3.get(0)).contains("3"); // The deleted row ID should be present - - // Verify metadata columns are present in all records - for (String result : results) { - // Each row should have: change_type, log_offset, timestamp, then original columns - String[] parts = result.substring(3, result.length() - 1).split(", "); - assertThat(parts.length).isGreaterThanOrEqualTo(6); // 3 metadata + 3 data columns - } - } - - @Test - public void testChangelogVirtualTableSchemaIntrospection() throws Exception { - // Create a primary key table + public void testDescribeChangelogTable() throws Exception { + // Create a table with various data types to test complex schema tEnv.executeSql( - "CREATE TABLE products (" + "CREATE TABLE complex_table (" + " id INT NOT NULL," + " name STRING," - + " price DECIMAL(10, 2)," + + " score DOUBLE," + + " is_active BOOLEAN," + + " created_date DATE," + + " metadata MAP," + + " tags ARRAY," + " PRIMARY KEY (id) NOT ENFORCED" + ")"); // Test DESCRIBE on changelog virtual table CloseableIterator describeResult = - tEnv.executeSql("DESCRIBE products$changelog").collect(); + tEnv.executeSql("DESCRIBE complex_table$changelog").collect(); List schemaRows = new ArrayList<>(); while (describeResult.hasNext()) { schemaRows.add(describeResult.next().toString()); } + // Should have 3 metadata columns + 7 data columns = 10 total + assertThat(schemaRows).hasSize(10); + // Verify metadata columns are listed first - // Format: +I[column_name, type, nullable (true/false), ...] - assertThat(schemaRows.get(0)).contains("_change_type"); - assertThat(schemaRows.get(0)).contains("STRING"); - // Flink DESCRIBE shows nullability as 'false' for NOT NULL columns - assertThat(schemaRows.get(0)).contains("false"); - - assertThat(schemaRows.get(1)).contains("_log_offset"); - assertThat(schemaRows.get(1)).contains("BIGINT"); - assertThat(schemaRows.get(1)).contains("false"); - - assertThat(schemaRows.get(2)).contains("_commit_timestamp"); - assertThat(schemaRows.get(2)).contains("TIMESTAMP"); - assertThat(schemaRows.get(2)).contains("false"); - - // Verify original columns follow - assertThat(schemaRows.get(3)).contains("id"); - assertThat(schemaRows.get(4)).contains("name"); - assertThat(schemaRows.get(5)).contains("price"); + // DESCRIBE format: +I[name, type, null, key, extras, watermark] + assertThat(schemaRows.get(0)) + .isEqualTo("+I[_change_type, STRING, false, null, null, null]"); + assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, false, null, null, null]"); + assertThat(schemaRows.get(2)) + .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(6), false, null, null, null]"); + + // Verify data columns maintain their types + // Note: Primary key info is not preserved in $changelog virtual table + assertThat(schemaRows.get(3)).isEqualTo("+I[id, INT, false, null, null, null]"); + assertThat(schemaRows.get(4)).isEqualTo("+I[name, STRING, true, null, null, null]"); + assertThat(schemaRows.get(5)).isEqualTo("+I[score, DOUBLE, true, null, null, null]"); + assertThat(schemaRows.get(6)).isEqualTo("+I[is_active, BOOLEAN, true, null, null, null]"); + assertThat(schemaRows.get(7)).isEqualTo("+I[created_date, DATE, true, null, null, null]"); + assertThat(schemaRows.get(8)) + .isEqualTo("+I[metadata, MAP, true, null, null, null]"); + assertThat(schemaRows.get(9)).isEqualTo("+I[tags, ARRAY, true, null, null, null]"); + + // Test SHOW CREATE TABLE on changelog virtual table + CloseableIterator showCreateResult = + tEnv.executeSql("SHOW CREATE TABLE complex_table$changelog").collect(); + + StringBuilder createTableStatement = new StringBuilder(); + while (showCreateResult.hasNext()) { + createTableStatement.append(showCreateResult.next().toString()); + } + + String createStatement = createTableStatement.toString(); + // Verify metadata columns are included in the CREATE TABLE statement + assertThat(createStatement).contains("_change_type"); + assertThat(createStatement).contains("_log_offset"); + assertThat(createStatement).contains("_commit_timestamp"); + // Verify original columns are also included + assertThat(createStatement).contains("id"); + assertThat(createStatement).contains("name"); + assertThat(createStatement).contains("score"); } @Test @@ -249,202 +207,198 @@ public void testChangelogVirtualTableWithNonPrimaryKeyTable() { } @Test - public void testAllChangeTypes() throws Exception { - // Create a primary key table - // Note: Using `val` instead of `value` as `value` is a reserved keyword in Flink SQL + public void testProjectionOnChangelogTable() throws Exception { + // Create a primary key table with 1 bucket and extra columns to test projection tEnv.executeSql( - "CREATE TABLE test_changes (" + "CREATE TABLE projection_test (" + " id INT NOT NULL," - + " val STRING," + + " name STRING," + + " amount BIGINT," + + " description STRING," + " PRIMARY KEY (id) NOT ENFORCED" - + ")"); + + ") WITH ('bucket.num' = '1')"); - TablePath tablePath = TablePath.of(DEFAULT_DB, "test_changes"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "projection_test"); - String query = "SELECT _change_type, id, val FROM test_changes$changelog"; + // Select only _change_type, id, and name (skip amount and description) + String query = "SELECT _change_type, id, name FROM projection_test$changelog"; CloseableIterator rowIter = tEnv.executeSql(query).collect(); - // Test INSERT (+I) - writeRows(conn, tablePath, Arrays.asList(row(1, "initial")), false); + // Test INSERT + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1", 100L, "Desc-1")), false); List insertResult = collectRowsWithTimeout(rowIter, 1, false); - assertThat(insertResult.get(0)).startsWith("+I[+I, 1, initial]"); - - // Test UPDATE (-U/+U) - writeRows(conn, tablePath, Arrays.asList(row(1, "updated")), false); + assertThat(insertResult.get(0)).isEqualTo("+I[+I, 1, Item-1]"); + + // Test UPDATE + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows( + conn, + tablePath, + Arrays.asList(row(1, "Item-1-Updated", 150L, "Desc-1-Updated")), + false); List updateResults = collectRowsWithTimeout(rowIter, 2, false); - assertThat(updateResults.get(0)).startsWith("+I[-U, 1, initial]"); - assertThat(updateResults.get(1)).startsWith("+I[+U, 1, updated]"); + assertThat(updateResults.get(0)).isEqualTo("+I[-U, 1, Item-1]"); + assertThat(updateResults.get(1)).isEqualTo("+I[+U, 1, Item-1-Updated]"); - // Test DELETE operation using the proper delete API - deleteRows(conn, tablePath, Arrays.asList(row(1, "updated"))); + // Test DELETE + CLOCK.advanceTime(Duration.ofMillis(100)); + deleteRows( + conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L, "Desc-1-Updated"))); List deleteResult = collectRowsWithTimeout(rowIter, 1, true); - // Verify the delete record contains the row data and has DELETE change type (-D) - // DELETE produces ChangeType.DELETE which maps to "-D" in the changelog - assertThat(deleteResult.get(0)).startsWith("+I[-D, 1, updated]"); + assertThat(deleteResult.get(0)).isEqualTo("+I[-D, 1, Item-1-Updated]"); } @Test - public void testChangelogVirtualTableConcurrentChanges() throws Exception { - // Create a primary key table + public void testChangelogScanWithAllChangeTypes() throws Exception { + // Create a primary key table with 1 bucket for consistent log_offset numbers tEnv.executeSql( - "CREATE TABLE concurrent_test (" + "CREATE TABLE scan_test (" + " id INT NOT NULL," - + " counter INT," + + " name STRING," + + " amount BIGINT," + " PRIMARY KEY (id) NOT ENFORCED" - + ")"); + + ") WITH ('bucket.num' = '1')"); - TablePath tablePath = TablePath.of(DEFAULT_DB, "concurrent_test"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "scan_test"); - // Start collecting from changelog - String query = "SELECT _change_type, id, counter FROM concurrent_test$changelog"; + // Start changelog scan + String query = "SELECT * FROM scan_test$changelog"; CloseableIterator rowIter = tEnv.executeSql(query).collect(); - // Perform multiple concurrent-like changes - for (int i = 1; i <= 5; i++) { - writeRows(conn, tablePath, Arrays.asList(row(i, i * 10)), false); - } + // Insert initial data with controlled timestamp + CLOCK.advanceTime(Duration.ofMillis(1000)); + List initialData = + Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)); + writeRows(conn, tablePath, initialData, false); - // Collect all inserts (don't close iterator - we need it for updates) - List results = collectRowsWithTimeout(rowIter, 5, false); + // Collect and validate inserts - with 1 bucket, offsets are predictable (0, 1) + List results = collectRowsWithTimeout(rowIter, 2, false); + assertThat(results).hasSize(2); - // Verify all are inserts - for (String result : results) { - assertThat(result).startsWith("+I[+I,"); - } + // With ManualClock and 1 bucket, we can assert exact row values + // Format: +I[_change_type, _log_offset, _commit_timestamp, id, name, amount] + assertThat(results.get(0)).isEqualTo("+I[+I, 0, 1970-01-01T00:00:01Z, 1, Item-1, 100]"); + assertThat(results.get(1)).isEqualTo("+I[+I, 1, 1970-01-01T00:00:01Z, 2, Item-2, 200]"); - // Update all records - for (int i = 1; i <= 5; i++) { - writeRows(conn, tablePath, Arrays.asList(row(i, i * 20)), false); - } + // Test UPDATE operation with new timestamp + CLOCK.advanceTime(Duration.ofMillis(1000)); + writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L)), false); + + // Collect update records (should get -U and +U) + List updateResults = collectRowsWithTimeout(rowIter, 2, false); + assertThat(updateResults).hasSize(2); + assertThat(updateResults.get(0)) + .isEqualTo("+I[-U, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]"); + assertThat(updateResults.get(1)) + .isEqualTo("+I[+U, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated, 150]"); - // Collect all updates (5 * 2 = 10 records: before and after for each) - // Now we can close the iterator - results = collectRowsWithTimeout(rowIter, 10, true); + // Test DELETE operation with new timestamp + CLOCK.advanceTime(Duration.ofMillis(1000)); + deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L))); - // Verify we have equal number of -U and +U - long updateBeforeCount = results.stream().filter(r -> r.contains("[-U,")).count(); - long updateAfterCount = results.stream().filter(r -> r.contains("[+U,")).count(); - assertThat(updateBeforeCount).isEqualTo(5); - assertThat(updateAfterCount).isEqualTo(5); + // Collect delete record + List deleteResult = collectRowsWithTimeout(rowIter, 1, true); + assertThat(deleteResult).hasSize(1); + assertThat(deleteResult.get(0)) + .isEqualTo("+I[-D, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]"); } @Test - public void testChangelogVirtualTableWithComplexSchema() throws Exception { - // Create a table with various data types + public void testChangelogWithScanStartupMode() throws Exception { + // Create a primary key table with 1 bucket for consistent log_offset numbers tEnv.executeSql( - "CREATE TABLE complex_table (" + "CREATE TABLE startup_mode_test (" + " id INT NOT NULL," + " name STRING," - + " score DOUBLE," - + " is_active BOOLEAN," - + " created_date DATE," - + " metadata MAP," - + " tags ARRAY," + " PRIMARY KEY (id) NOT ENFORCED" - + ")"); - - // Verify the schema includes metadata columns - CloseableIterator describeResult = - tEnv.executeSql("DESCRIBE complex_table$changelog").collect(); - - List schemaRows = new ArrayList<>(); - while (describeResult.hasNext()) { - schemaRows.add(describeResult.next().toString()); + + ") WITH ('bucket.num' = '1')"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_mode_test"); + + // Write first batch of data + CLOCK.advanceTime(Duration.ofMillis(100)); + List batch1 = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + writeRows(conn, tablePath, batch1, false); + + // Write second batch of data + CLOCK.advanceTime(Duration.ofMillis(100)); + List batch2 = Arrays.asList(row(4, "v4"), row(5, "v5")); + writeRows(conn, tablePath, batch2, false); + + // 1. Test scan.startup.mode='earliest' - should read all records from beginning + String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */"; + String queryEarliest = + "SELECT _change_type, id, name FROM startup_mode_test$changelog" + optionsEarliest; + CloseableIterator rowIterEarliest = tEnv.executeSql(queryEarliest).collect(); + List earliestResults = collectRowsWithTimeout(rowIterEarliest, 5, true); + assertThat(earliestResults).hasSize(5); + // All should be INSERT change types + for (String result : earliestResults) { + assertThat(result).startsWith("+I[+I,"); } - // Should have 3 metadata columns + 7 data columns = 10 total - assertThat(schemaRows).hasSize(10); - - // Verify metadata columns - assertThat(schemaRows.get(0)).contains("_change_type"); - assertThat(schemaRows.get(1)).contains("_log_offset"); - assertThat(schemaRows.get(2)).contains("_commit_timestamp"); - - // Verify data columns maintain their types - assertThat(schemaRows.get(3)).contains("id"); - assertThat(schemaRows.get(4)).contains("name"); - assertThat(schemaRows.get(5)).contains("score"); - assertThat(schemaRows.get(6)).contains("is_active"); - assertThat(schemaRows.get(7)).contains("created_date"); - assertThat(schemaRows.get(8)).contains("metadata"); - assertThat(schemaRows.get(9)).contains("tags"); + // 2. Test scan.startup.mode='latest' - should only read new records after subscription + String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') */"; + String queryLatest = + "SELECT _change_type, id, name FROM startup_mode_test$changelog" + optionsLatest; + CloseableIterator rowIterLatest = tEnv.executeSql(queryLatest).collect(); + + // Write new data after subscribing with 'latest' + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, tablePath, Arrays.asList(row(6, "v6")), false); + List latestResults = collectRowsWithTimeout(rowIterLatest, 1, true); + assertThat(latestResults).hasSize(1); + assertThat(latestResults.get(0)).isEqualTo("+I[+I, 6, v6]"); } @Test - public void testBasicChangelogScanWithMetadataValidation() throws Exception { - // Create a primary key table - // Note: Avoiding `value` as it's a reserved keyword in Flink SQL + public void testChangelogWithPartitionedTable() throws Exception { + // Create a partitioned primary key table with 1 bucket per partition tEnv.executeSql( - "CREATE TABLE scan_test (" + "CREATE TABLE partitioned_test (" + " id INT NOT NULL," + " name STRING," - + " amount BIGINT," - + " PRIMARY KEY (id) NOT ENFORCED" - + ")"); - - TablePath tablePath = TablePath.of(DEFAULT_DB, "scan_test"); + + " region STRING NOT NULL," + + " PRIMARY KEY (id, region) NOT ENFORCED" + + ") PARTITIONED BY (region) WITH ('bucket.num' = '1')"); - // Start changelog scan - String query = "SELECT * FROM scan_test$changelog"; + // Insert data into different partitions using Flink SQL + CLOCK.advanceTime(Duration.ofMillis(100)); + tEnv.executeSql( + "INSERT INTO partitioned_test VALUES " + + "(1, 'Item-1', 'us'), " + + "(2, 'Item-2', 'us'), " + + "(3, 'Item-3', 'eu')") + .await(); + + // Query the changelog virtual table for all partitions + String query = "SELECT _change_type, id, name, region FROM partitioned_test$changelog"; CloseableIterator rowIter = tEnv.executeSql(query).collect(); - // Insert initial data - List initialData = - Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)); - writeRows(conn, tablePath, initialData, false); - - // Collect and validate inserts (don't close iterator - we need it for update/delete tests) - List results = collectRowsWithTimeout(rowIter, 2, false); - - // Validate that we received 2 INSERT records - assertThat(results).hasSize(2); + // Collect initial inserts + List results = collectRowsWithTimeout(rowIter, 3, false); + assertThat(results).hasSize(3); - // Validate metadata columns are present and correctly formatted + // Verify all are INSERT change types for (String result : results) { - // Parse the row to validate structure - String[] parts = result.substring(3, result.length() - 1).split(", ", 6); - - // Validate change type column - assertThat(parts[0]).isEqualTo("+I"); - - // Validate log offset column (should be a valid long) - assertThat(Long.parseLong(parts[1])).isGreaterThanOrEqualTo(0); - - // Validate timestamp column exists (we can't predict exact value) - assertThat(parts[2]).isNotEmpty(); - - // Validate data columns follow metadata - int id = Integer.parseInt(parts[3]); - assertThat(id).isIn(1, 2); - assertThat(parts[4]).isIn("Item-1", "Item-2"); - assertThat(Long.parseLong(parts[5])).isIn(100L, 200L); + assertThat(result).startsWith("+I[+I,"); } - // Test an update operation - writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L)), false); + // Verify we have data from both partitions + long usCount = results.stream().filter(r -> r.contains("us")).count(); + long euCount = results.stream().filter(r -> r.contains("eu")).count(); + assertThat(usCount).isEqualTo(2); + assertThat(euCount).isEqualTo(1); - // Collect update records (should get -U and +U) + // Update a record in a specific partition + CLOCK.advanceTime(Duration.ofMillis(100)); + tEnv.executeSql("INSERT INTO partitioned_test VALUES (1, 'Item-1-Updated', 'us')").await(); List updateResults = collectRowsWithTimeout(rowIter, 2, false); assertThat(updateResults).hasSize(2); + assertThat(updateResults.get(0)).contains("-U", "1", "Item-1", "us"); + assertThat(updateResults.get(1)).contains("+U", "1", "Item-1-Updated", "us"); - // Validate UPDATE_BEFORE record - assertThat(updateResults.get(0)).contains("-U"); - assertThat(updateResults.get(0)).contains("Item-1"); - assertThat(updateResults.get(0)).contains("100"); - - // Validate UPDATE_AFTER record - assertThat(updateResults.get(1)).contains("+U"); - assertThat(updateResults.get(1)).contains("Item-1-Updated"); - assertThat(updateResults.get(1)).contains("150"); - - // Test delete operation using the proper delete API - deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L))); - - // Collect delete record - List deleteResult = collectRowsWithTimeout(rowIter, 1, true); - assertThat(deleteResult).hasSize(1); - // Verify the delete record contains the row data (the change type may be -D or -U) - assertThat(deleteResult.get(0)).contains("2"); - assertThat(deleteResult.get(0)).contains("Item-2"); + rowIter.close(); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index cb2afdd935..96282c1b69 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -53,9 +53,9 @@ import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; -import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_CHANGE_TYPE_COLUMN; -import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_COMMIT_TIMESTAMP_COLUMN; -import static org.apache.fluss.metadata.TableDescriptor.CHANGELOG_LOG_OFFSET_COLUMN; +import static org.apache.fluss.metadata.TableDescriptor.CHANGE_TYPE_COLUMN; +import static org.apache.fluss.metadata.TableDescriptor.COMMIT_TIMESTAMP_COLUMN; +import static org.apache.fluss.metadata.TableDescriptor.LOG_OFFSET_COLUMN; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.PartitionUtils.PARTITION_KEY_SUPPORTED_TYPES; @@ -70,9 +70,9 @@ public class TableDescriptorValidation { OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME, BUCKET_COLUMN_NAME, - CHANGELOG_CHANGE_TYPE_COLUMN, - CHANGELOG_LOG_OFFSET_COLUMN, - CHANGELOG_COMMIT_TIMESTAMP_COLUMN))); + CHANGE_TYPE_COLUMN, + LOG_OFFSET_COLUMN, + COMMIT_TIMESTAMP_COLUMN))); private static final List KEY_UNSUPPORTED_TYPES = Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW); From 32a242917a6035b43f643222fe329648a7e9c0ee Mon Sep 17 00:00:00 2001 From: Mehul Batra Date: Thu, 22 Jan 2026 15:16:53 +0530 Subject: [PATCH 3/6] fix imports --- .../java/org/apache/fluss/flink/catalog/FlinkTableFactory.java | 2 +- .../apache/fluss/flink/source/ChangelogFlinkTableSource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index c1770ee33c..e93b98aa9c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -23,8 +23,8 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.lake.LakeTableFactory; import org.apache.fluss.flink.sink.FlinkTableSink; -import org.apache.fluss.flink.source.ChangelogFlinkTableSource; import org.apache.fluss.flink.sink.shuffle.DistributionMode; +import org.apache.fluss.flink.source.ChangelogFlinkTableSource; import org.apache.fluss.flink.source.FlinkTableSource; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.metadata.DataLakeFormat; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index c8e47dcd5f..932264787e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -17,9 +17,9 @@ package org.apache.fluss.flink.source; +import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema; -import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.TableDescriptor; From a711cb1dc333e1b597db6975d4ae5b17287079f0 Mon Sep 17 00:00:00 2001 From: Mehul Batra Date: Thu, 22 Jan 2026 16:35:29 +0530 Subject: [PATCH 4/6] fix ut --- .../org/apache/fluss/flink/utils/ChangelogRowConverterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java index 594903240a..c569ba86dd 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java @@ -141,7 +141,7 @@ void testProducedTypeHasMetadataColumns() { assertThat(producedType.getTypeAt(1)) .isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class); assertThat(producedType.getTypeAt(2)) - .isInstanceOf(org.apache.flink.table.types.logical.TimestampType.class); + .isInstanceOf(org.apache.flink.table.types.logical.LocalZonedTimestampType.class); } @Test From 5fbacfc4378c99a8be68b3458b640411fa395523 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Fri, 23 Jan 2026 15:55:44 +0800 Subject: [PATCH 5/6] resolve conflicts --- .../org/apache/fluss/flink/catalog/FlinkTableFactory.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index e93b98aa9c..eaea9df233 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -99,11 +99,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); validateSourceOptions(helper, tableOptions); - Optional datalakeFormat = getDatalakeFormat(tableOptions); - List prefixesToSkip = - new ArrayList<>(Arrays.asList("table.", "client.", "fields.")); - datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + ".")); - helper.validateExcept(prefixesToSkip.toArray(new String[0])); boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) @@ -283,7 +278,8 @@ private LakeTableFactory mayInitLakeTableFactory() { private static void validateSourceOptions( FactoryUtil.TableFactoryHelper helper, ReadableConfig tableOptions) { Optional datalakeFormat = getDatalakeFormat(tableOptions); - List prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client.")); + List prefixesToSkip = + new ArrayList<>(Arrays.asList("table.", "client.", "fields.")); datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + ".")); helper.validateExcept(prefixesToSkip.toArray(new String[0])); FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); From 4f113e46fb63c89868fc557b5a1f3e343146ad69 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Fri, 23 Jan 2026 16:50:24 +0800 Subject: [PATCH 6/6] Jarks' comments. --- .../fluss/flink/catalog/FlinkCatalog.java | 2 +- .../flink/catalog/FlinkTableFactory.java | 9 --- .../source/ChangelogFlinkTableSource.java | 18 ++---- .../flink/utils/ChangelogRowConverter.java | 2 +- .../flink/catalog/FlinkCatalogITCase.java | 10 +-- .../source/ChangelogVirtualTableITCase.java | 62 ++++++++++++------- 6 files changed, 52 insertions(+), 51 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index e30a76476d..e39880add2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -956,7 +956,7 @@ private Schema buildChangelogSchema(Schema originalSchema) { // Add metadata columns first builder.column("_change_type", STRING().notNull()); builder.column("_log_offset", BIGINT().notNull()); - builder.column("_commit_timestamp", TIMESTAMP_LTZ().notNull()); + builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull()); // Add all original columns (preserves all column attributes including comments) builder.fromColumns(originalSchema.getColumns()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index eaea9df233..5a5c0278e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -322,13 +322,6 @@ private DynamicTableSource createChangelogTableSource( ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); - int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, dataColumnsType); - - // Changelog/binlog virtual tables are purely log-based and don't have a primary key. - // Setting primaryKeyIndexes to empty ensures the enumerator fetches log-only splits - // (not snapshot splits), which is the correct behavior for virtual tables. - int[] primaryKeyIndexes = new int[0]; - // Partition key indexes based on data columns int[] partitionKeyIndexes = resolvedCatalogTable.getPartitionKeys().stream() @@ -344,8 +337,6 @@ private DynamicTableSource createChangelogTableSource( TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), toFlussClientConfig(catalogTableOptions, context.getConfiguration()), tableOutputType, - primaryKeyIndexes, - bucketKeyIndexes, partitionKeyIndexes, isStreamingMode, startupOptions, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index 932264787e..3883c43086 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -53,8 +53,6 @@ public class ChangelogFlinkTableSource implements ScanTableSource { // _commit_timestamp) private final org.apache.flink.table.types.logical.RowType changelogOutputType; private final org.apache.flink.table.types.logical.RowType dataColumnsType; - private final int[] primaryKeyIndexes; - private final int[] bucketKeyIndexes; private final int[] partitionKeyIndexes; private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; @@ -78,8 +76,6 @@ public ChangelogFlinkTableSource( TablePath tablePath, Configuration flussConfig, org.apache.flink.table.types.logical.RowType changelogOutputType, - int[] primaryKeyIndexes, - int[] bucketKeyIndexes, int[] partitionKeyIndexes, boolean streaming, FlinkConnectorOptionsUtils.StartupOptions startupOptions, @@ -89,8 +85,6 @@ public ChangelogFlinkTableSource( this.flussConfig = flussConfig; // The changelogOutputType already includes metadata columns from FlinkCatalog this.changelogOutputType = changelogOutputType; - this.primaryKeyIndexes = primaryKeyIndexes; - this.bucketKeyIndexes = bucketKeyIndexes; this.partitionKeyIndexes = partitionKeyIndexes; this.streaming = streaming; this.startupOptions = startupOptions; @@ -161,7 +155,11 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { new FlinkSource<>( flussConfig, tablePath, - hasPrimaryKey(), + // Changelog/binlog virtual tables are purely log-based and don't have a + // primary key, setting hasPrimaryKey "false" to ensure the enumerator + // fetches log-only splits (not snapshot splits), which is the correct + // behavior for virtual tables. + false, isPartitioned(), flussRowType, projectedFields, @@ -197,8 +195,6 @@ public DynamicTableSource copy() { tablePath, flussConfig, changelogOutputType, - primaryKeyIndexes, - bucketKeyIndexes, partitionKeyIndexes, streaming, startupOptions, @@ -218,10 +214,6 @@ public String asSummaryString() { // TODO: Implement projection pushdown handling for metadata columns // TODO: Implement filter pushdown - private boolean hasPrimaryKey() { - return primaryKeyIndexes.length > 0; - } - private boolean isPartitioned() { return partitionKeyIndexes.length > 0; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java index d9d6b1951a..6b8081a6d1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java @@ -110,7 +110,7 @@ public static org.apache.flink.table.types.logical.RowType buildChangelogRowType fields.add( new org.apache.flink.table.types.logical.RowType.RowField( TableDescriptor.COMMIT_TIMESTAMP_COLUMN, - new LocalZonedTimestampType(false, 6))); + new LocalZonedTimestampType(false, 3))); // Add all original fields fields.addAll(originalType.getFields()); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index de39d85435..5381b4672e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -893,8 +893,9 @@ void testGetChangelogVirtualTable() throws Exception { + " id INT NOT NULL," + " name STRING," + " amount BIGINT," - + " PRIMARY KEY (id) NOT ENFORCED" - + ") WITH ('bucket.num' = '1')"); + + " PRIMARY KEY (id, name) NOT ENFORCED" + + ") PARTITIONED BY (name) " + + "WITH ('bucket.num' = '1')"); // Get the $changelog virtual table via catalog API CatalogTable changelogTable = @@ -907,13 +908,14 @@ void testGetChangelogVirtualTable() throws Exception { Schema.newBuilder() .column("_change_type", DataTypes.STRING().notNull()) .column("_log_offset", DataTypes.BIGINT().notNull()) - .column("_commit_timestamp", DataTypes.TIMESTAMP_LTZ().notNull()) + .column("_commit_timestamp", DataTypes.TIMESTAMP_LTZ(3).notNull()) .column("id", DataTypes.INT().notNull()) - .column("name", DataTypes.STRING()) + .column("name", DataTypes.STRING().notNull()) .column("amount", DataTypes.BIGINT()) .build(); assertThat(changelogTable.getUnresolvedSchema()).isEqualTo(expectedSchema); + assertThat(changelogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("name")); // Verify options are inherited from base table assertThat(changelogTable.getOptions()).containsEntry("bucket.num", "1"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index ab106ea4a2..a597b37cdb 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; @@ -100,6 +101,8 @@ void before() { tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); tEnv.executeSql("create database " + DEFAULT_DB); tEnv.useDatabase(DEFAULT_DB); + // reset clock before each test + CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); } @AfterEach @@ -153,7 +156,7 @@ public void testDescribeChangelogTable() throws Exception { .isEqualTo("+I[_change_type, STRING, false, null, null, null]"); assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT, false, null, null, null]"); assertThat(schemaRows.get(2)) - .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(6), false, null, null, null]"); + .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false, null, null, null]"); // Verify data columns maintain their types // Note: Primary key info is not preserved in $changelog virtual table @@ -177,13 +180,21 @@ public void testDescribeChangelogTable() throws Exception { String createStatement = createTableStatement.toString(); // Verify metadata columns are included in the CREATE TABLE statement - assertThat(createStatement).contains("_change_type"); - assertThat(createStatement).contains("_log_offset"); - assertThat(createStatement).contains("_commit_timestamp"); - // Verify original columns are also included - assertThat(createStatement).contains("id"); - assertThat(createStatement).contains("name"); - assertThat(createStatement).contains("score"); + assertThat(createStatement) + .contains( + "CREATE TABLE `testcatalog`.`test_changelog_db`.`complex_table$changelog` (\n" + + " `_change_type` VARCHAR(2147483647) NOT NULL,\n" + + " `_log_offset` BIGINT NOT NULL,\n" + + " `_commit_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n" + + " `id` INT NOT NULL,\n" + + " `name` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `is_active` BOOLEAN,\n" + + " `created_date` DATE,\n" + + " `metadata` MAP,\n" + + " `tags` ARRAY\n" + // with options contains random properties, skip checking + + ")"); } @Test @@ -350,6 +361,20 @@ public void testChangelogWithScanStartupMode() throws Exception { List latestResults = collectRowsWithTimeout(rowIterLatest, 1, true); assertThat(latestResults).hasSize(1); assertThat(latestResults.get(0)).isEqualTo("+I[+I, 6, v6]"); + + // 3. Test scan.startup.mode='timestamp' - should read records from specific timestamp + // read between batch1 and batch2 + String optionsTimestamp = + " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '150') */"; + String queryTimestamp = "SELECT * FROM startup_mode_test$changelog " + optionsTimestamp; + CloseableIterator rowIterTimestamp = tEnv.executeSql(queryTimestamp).collect(); + List timestampResults = collectRowsWithTimeout(rowIterTimestamp, 2, true); + assertThat(timestampResults).hasSize(2); + // Should contain records from batch2 only + assertThat(timestampResults) + .containsExactlyInAnyOrder( + "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]", + "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); } @Test @@ -378,26 +403,17 @@ public void testChangelogWithPartitionedTable() throws Exception { // Collect initial inserts List results = collectRowsWithTimeout(rowIter, 3, false); - assertThat(results).hasSize(3); - - // Verify all are INSERT change types - for (String result : results) { - assertThat(result).startsWith("+I[+I,"); - } - - // Verify we have data from both partitions - long usCount = results.stream().filter(r -> r.contains("us")).count(); - long euCount = results.stream().filter(r -> r.contains("eu")).count(); - assertThat(usCount).isEqualTo(2); - assertThat(euCount).isEqualTo(1); + List expectedResults = + Arrays.asList( + "+I[+I, 1, Item-1, us]", "+I[+I, 2, Item-2, us]", "+I[+I, 3, Item-3, eu]"); + assertThat(results).isEqualTo(expectedResults); // Update a record in a specific partition CLOCK.advanceTime(Duration.ofMillis(100)); tEnv.executeSql("INSERT INTO partitioned_test VALUES (1, 'Item-1-Updated', 'us')").await(); List updateResults = collectRowsWithTimeout(rowIter, 2, false); - assertThat(updateResults).hasSize(2); - assertThat(updateResults.get(0)).contains("-U", "1", "Item-1", "us"); - assertThat(updateResults.get(1)).contains("+U", "1", "Item-1-Updated", "us"); + expectedResults = Arrays.asList("+I[-U, 1, Item-1, us]", "+I[+U, 1, Item-1-Updated, us]"); + assertThat(updateResults).isEqualTo(expectedResults); rowIter.close(); }