From 793d437af26a053f2202a829aed29648d0c2367b Mon Sep 17 00:00:00 2001 From: Liebing Date: Wed, 21 Jan 2026 17:03:05 +0800 Subject: [PATCH] [flink] Support custom properties in Flink table factory --- .../fluss/flink/catalog/FlinkTableFactory.java | 15 --------------- .../fluss/flink/catalog/FlinkCatalogITCase.java | 17 ++++------------- .../flink/catalog/FlinkTableFactoryTest.java | 10 ++++++---- 3 files changed, 10 insertions(+), 32 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 48d74e712e..7ddb113d16 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -26,7 +26,6 @@ import org.apache.fluss.flink.sink.shuffle.DistributionMode; import org.apache.fluss.flink.source.FlinkTableSource; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; -import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.TablePath; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -48,20 +47,17 @@ import org.apache.flink.table.types.logical.RowType; import java.time.ZoneId; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX; import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; -import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys; import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkOption; @@ -92,11 +88,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); - Optional datalakeFormat = getDatalakeFormat(tableOptions); - List prefixesToSkip = - new ArrayList<>(Arrays.asList("table.", "client.", "fields.")); - datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + ".")); - helper.validateExcept(prefixesToSkip.toArray(new String[0])); boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) @@ -160,12 +151,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { public DynamicTableSink createDynamicTableSink(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); - Optional datalakeFormat = getDatalakeFormat(tableOptions); - if (datalakeFormat.isPresent()) { - helper.validateExcept("table.", "client.", "fields.", datalakeFormat.get() + "."); - } else { - helper.validateExcept("table.", "client.", "fields."); - } boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index f42ba30ae7..58f8a7b862 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -757,20 +757,11 @@ void testCreateTableWithUnknownOptions() { "create table test_table_other_unknown_options (a int, b int)" + " with ('connector' = 'fluss', 'bootstrap.servers' = 'localhost:9092', 'other.unknown.option' = 'other-unknown-val')"); - // test invalid table as source - assertThatThrownBy(() -> tEnv.explainSql("select * from test_table_other_unknown_options")) - .cause() - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Unsupported options found for 'fluss'"); + // test table with unknown option as source + tEnv.explainSql("select * from test_table_other_unknown_options"); - // test invalid table as sink - assertThatThrownBy( - () -> - tEnv.explainSql( - "insert into test_table_other_unknown_options values (1, 2)")) - .cause() - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Unsupported options found for 'fluss'"); + // test table with unknown option as sink + tEnv.explainSql("insert into test_table_other_unknown_options values (1, 2)"); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java index cf7a494997..c1d136cb4c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java @@ -77,10 +77,8 @@ void testTableSourceOptions() { Map validProperties = getBasicOptions(); validProperties.put("k1", "v1"); - // test invalid options - assertThatThrownBy(() -> createTableSource(schema, validProperties)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining("Unsupported options:\n" + "\n" + "k1"); + // test create table source with custom properties is ok + createTableSource(schema, validProperties); // test scan startup mode options Map scanModeProperties = getBasicOptions(); @@ -178,6 +176,10 @@ void testSink() { List bucketKeys = tableSink.getBucketKeys(); assertThat(bucketKeys) .isEqualTo(Arrays.asList(properties.get(BUCKET_KEY.key()).split(","))); + + // test create table sink with custom properties is ok + properties.put("k1", "v1"); + createTableSink(schema, properties); } private ResolvedSchema createBasicSchema() {