Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ dependency-reduced-pom.xml
### VS Code ###
.vscode/

### claude code ###
.claude/

### Mac OS ###
.DS_Store

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,33 @@ public void testSystemsColumns() throws Exception {
+ "because they are reserved system columns in Fluss. "
+ "Please use other names for these columns. "
+ "The reserved system columns are: __offset, __timestamp, __bucket");

// Test changelog virtual table metadata columns are also reserved
TableDescriptor changelogColumnsDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("_change_type", DataTypes.STRING())
.column("_log_offset", DataTypes.BIGINT())
.column("_commit_timestamp", DataTypes.TIMESTAMP())
.build())
.distributedBy(1)
.build();

TablePath changelogTablePath = TablePath.of(dbName, "test_changelog_columns");

assertThatThrownBy(
() ->
admin.createTable(
changelogTablePath,
changelogColumnsDescriptor,
false)
.get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining(
"_change_type, _log_offset, _commit_timestamp cannot be used as column names");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public final class TableDescriptor implements Serializable {
public static final String TIMESTAMP_COLUMN_NAME = "__timestamp";
public static final String BUCKET_COLUMN_NAME = "__bucket";

// Reserved column names for virtual table metadata ($changelog and $binlog)
public static final String CHANGE_TYPE_COLUMN = "_change_type";
public static final String LOG_OFFSET_COLUMN = "_log_offset";
public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";

private final Schema schema;
private final @Nullable String comment;
private final List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.18. */
public class Flink118ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.19. */
public class Flink119ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.20. */
public class Flink120ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source;

/** IT case for {@link ChangelogVirtualTableITCase} in Flink 2.2. */
public class Flink22ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
import static org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS;
Expand Down Expand Up @@ -112,6 +115,8 @@
public class FlinkCatalog extends AbstractCatalog {

public static final String LAKE_TABLE_SPLITTER = "$lake";
public static final String CHANGELOG_TABLE_SUFFIX = "$changelog";
public static final String BINLOG_TABLE_SUFFIX = "$binlog";

protected final ClassLoader classLoader;

Expand Down Expand Up @@ -303,6 +308,20 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
// may be should be as a datalake table
String tableName = objectPath.getObjectName();

// Check if this is a virtual table ($changelog or $binlog)
if (tableName.endsWith(CHANGELOG_TABLE_SUFFIX)
&& !tableName.contains(LAKE_TABLE_SPLITTER)) {
return getVirtualChangelogTable(objectPath);
} else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)
&& !tableName.contains(LAKE_TABLE_SPLITTER)) {
// TODO: Implement binlog virtual table in future
throw new UnsupportedOperationException(
String.format(
"$binlog virtual tables are not yet supported for table %s",
objectPath));
}

TablePath tablePath = toTablePath(objectPath);
try {
TableInfo tableInfo;
Expand Down Expand Up @@ -862,4 +881,83 @@ private static boolean isPrefixList(List<String> fullList, List<String> prefixLi
}
return true;
}

/**
* Creates a virtual $changelog table by modifying the base table's to include metadata columns.
*/
private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
// Extract the base table name (remove $changelog suffix)
String virtualTableName = objectPath.getObjectName();
String baseTableName =
virtualTableName.substring(
0, virtualTableName.length() - CHANGELOG_TABLE_SUFFIX.length());

// Get the base table
ObjectPath baseObjectPath = new ObjectPath(objectPath.getDatabaseName(), baseTableName);
TablePath baseTablePath = toTablePath(baseObjectPath);

try {
// Retrieve base table info
TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();

// $changelog is supported for both PK tables and log tables:
// - PK tables: have change types +I, -U, +U, -D
// - Log tables: only have +A (append-only)

// Convert to Flink table
CatalogBaseTable catalogBaseTable = FlinkConversions.toFlinkTable(tableInfo);

if (!(catalogBaseTable instanceof CatalogTable)) {
throw new UnsupportedOperationException(
"Virtual $changelog tables are only supported for regular tables");
}

CatalogTable baseTable = (CatalogTable) catalogBaseTable;

// Build the changelog schema by adding metadata columns
Schema originalSchema = baseTable.getUnresolvedSchema();
Schema changelogSchema = buildChangelogSchema(originalSchema);

// Copy options from base table
Map<String, String> newOptions = new HashMap<>(baseTable.getOptions());
newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
newOptions.putAll(securityConfigs);

// Create a new CatalogTable with the modified schema
return CatalogTableAdapter.toCatalogTable(
changelogSchema,
baseTable.getComment(),
baseTable.getPartitionKeys(),
newOptions);

} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
if (isTableNotExist(t)) {
throw new TableNotExistException(getName(), baseObjectPath);
} else {
throw new CatalogException(
String.format(
"Failed to get virtual changelog table %s in %s",
objectPath, getName()),
t);
}
}
}

private Schema buildChangelogSchema(Schema originalSchema) {
Schema.Builder builder = Schema.newBuilder();

// Add metadata columns first
builder.column("_change_type", STRING().notNull());
builder.column("_log_offset", BIGINT().notNull());
builder.column("_commit_timestamp", TIMESTAMP_LTZ().notNull());

// Add all original columns (preserves all column attributes including comments)
builder.fromColumns(originalSchema.getColumns());

// Note: We don't copy primary keys or watermarks for virtual tables

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.flink.lake.LakeTableFactory;
import org.apache.fluss.flink.sink.FlinkTableSink;
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
import org.apache.fluss.flink.source.FlinkTableSource;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
import org.apache.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -90,20 +91,20 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return lakeTableFactory.createDynamicTableSource(context, lakeTableName);
}

// Check if this is a $changelog suffix in table name
if (tableName.endsWith(FlinkCatalog.CHANGELOG_TABLE_SUFFIX)) {
return createChangelogTableSource(context, tableIdentifier, tableName);
}

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
List<String> prefixesToSkip =
new ArrayList<>(Arrays.asList("table.", "client.", "fields."));
datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + "."));
helper.validateExcept(prefixesToSkip.toArray(new String[0]));
validateSourceOptions(helper, tableOptions);

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType();
FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);

ZoneId timeZone =
FlinkConnectorOptionsUtils.getLocalTimeZone(
Expand Down Expand Up @@ -267,4 +268,87 @@ private LakeTableFactory mayInitLakeTableFactory() {
}
return lakeTableFactory;
}

/**
* Validates table source options using the standard validation pattern.
*
* @param helper the factory helper for option validation
* @param tableOptions the table options to validate
*/
private static void validateSourceOptions(
FactoryUtil.TableFactoryHelper helper, ReadableConfig tableOptions) {
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
List<String> prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client."));
datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + "."));
helper.validateExcept(prefixesToSkip.toArray(new String[0]));
FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
}

/** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */
private DynamicTableSource createChangelogTableSource(
Context context, ObjectIdentifier tableIdentifier, String tableName) {
// Extract the base table name by removing the $changelog suffix
String baseTableName =
tableName.substring(
0, tableName.length() - FlinkCatalog.CHANGELOG_TABLE_SUFFIX.length());

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

// tableOutputType includes metadata columns: [_change_type, _log_offset, _commit_timestamp,
// data_cols...]
RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType();

// Extract data columns type (skip the 3 metadata columns) for index calculations
int numMetadataColumns = 3;
List<RowType.RowField> dataFields =
tableOutputType
.getFields()
.subList(numMetadataColumns, tableOutputType.getFieldCount());
RowType dataColumnsType = new RowType(new ArrayList<>(dataFields));

Map<String, String> catalogTableOptions = context.getCatalogTable().getOptions();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
validateSourceOptions(helper, tableOptions);

ZoneId timeZone =
FlinkConnectorOptionsUtils.getLocalTimeZone(
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
FlinkConnectorOptionsUtils.getStartupOptions(tableOptions, timeZone);

ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();

int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, dataColumnsType);

// Changelog/binlog virtual tables are purely log-based and don't have a primary key.
// Setting primaryKeyIndexes to empty ensures the enumerator fetches log-only splits
// (not snapshot splits), which is the correct behavior for virtual tables.
int[] primaryKeyIndexes = new int[0];

// Partition key indexes based on data columns
int[] partitionKeyIndexes =
resolvedCatalogTable.getPartitionKeys().stream()
.mapToInt(dataColumnsType::getFieldIndex)
.toArray();

long partitionDiscoveryIntervalMs =
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();

return new ChangelogFlinkTableSource(
TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
toFlussClientConfig(catalogTableOptions, context.getConfiguration()),
tableOutputType,
primaryKeyIndexes,
bucketKeyIndexes,
partitionKeyIndexes,
isStreamingMode,
startupOptions,
partitionDiscoveryIntervalMs,
catalogTableOptions);
}
}
Loading
Loading