From 4d2fab7d4b3974d4390a23e6050267669756eca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Tue, 13 Jan 2026 16:36:03 +0800 Subject: [PATCH 1/4] add lake paimon ddl ut add tiering test base --- .../fluss-spark-lake-paimon/pom.xml | 201 +++++++++++++++++ ...e.fluss.lake.lakestorage.LakeStoragePlugin | 19 ++ .../paimon/SparkLakePaimonCatalogTest.scala | 205 ++++++++++++++++++ .../lake/paimon/SparkLakePaimonTestBase.scala | 145 +++++++++++++ fluss-spark/fluss-spark-lake/pom.xml | 59 +++++ fluss-spark/pom.xml | 1 + 6 files changed, 630 insertions(+) create mode 100644 fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml create mode 100644 fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin create mode 100644 fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala create mode 100644 fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala create mode 100644 fluss-spark/fluss-spark-lake/pom.xml diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml new file mode 100644 index 0000000000..bf285b599d --- /dev/null +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml @@ -0,0 +1,201 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark-lake + 0.9-SNAPSHOT + + + fluss-spark-lake-paimon_${scala.binary.version} + Fluss : Engine Spark : Lake : Paimon + + + 1.3.1 + test + + + + + org.apache.fluss + fluss-spark-common_${scala.binary.version} + ${project.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + tests + test + + + + org.apache.fluss + fluss-client + ${project.version} + test + + + + org.apache.fluss + fluss-common + ${project.version} + tests + test + + + + org.apache.fluss + fluss-lake-paimon + ${project.version} + + + + + org.eclipse.collections + eclipse-collections + 11.1.0 + test + + + + org.apache.fluss + fluss-test-utils + + + + + org.apache.curator + curator-test + ${curator.version} + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + tests + test + + + + junit + junit + 3.8.1 + test + + + + org.apache.flink + flink-core + ${flink.version} + ${tiering.deps.scope} + + + org.apache.flink + flink-table-common + ${flink.version} + ${tiering.deps.scope} + + + org.apache.flink + flink-table-runtime + ${flink.version} + ${tiering.deps.scope} + + + org.apache.flink + flink-connector-base + ${flink.version} + ${tiering.deps.scope} + + + org.apache.flink + flink-connector-files + ${flink.version} + ${tiering.deps.scope} + + + org.apache.flink + flink-table-test-utils + ${flink.version} + ${tiering.deps.scope} + + + org.apache.fluss + fluss-flink-${flink.major.version} + ${project.version} + ${tiering.deps.scope} + + + org.apache.paimon + paimon-bundle + ${paimon.version} + ${tiering.deps.scope} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + + diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin new file mode 100644 index 0000000000..69bf0f8a4b --- /dev/null +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin \ No newline at end of file diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala new file mode 100644 index 0000000000..deb14fce82 --- /dev/null +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake.paimon + +import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.metadata._ +import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME} +import org.apache.fluss.spark.SparkCatalog +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER} +import org.apache.fluss.types.{DataTypes, RowType} + +import org.apache.paimon.CoreOptions +import org.apache.paimon.table.Table +import org.apache.spark.sql.Row +import org.apache.spark.sql.connector.catalog.Identifier +import org.assertj.core.api.Assertions.{assertThat, assertThatList} + +import scala.collection.JavaConverters._ + +class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { + + private def verifyPaimonTable( + paimonTable: Table, + flussTable: TableInfo, + expectedPaimonRowType: org.apache.paimon.types.RowType, + expectedBucketKey: String, + bucketNum: Int): Unit = { + if (!flussTable.hasPrimaryKey) { + assert(paimonTable.primaryKeys.isEmpty) + } else { + assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, "primary key")( + paimonTable.primaryKeys()) + } + + if (flussTable.isPartitioned) { + assertResult(flussTable.getPartitionKeys, "partition keys")(paimonTable.partitionKeys()) + } + + assert(flussTable.getNumBuckets == bucketNum) + assert(paimonTable.options().get(CoreOptions.BUCKET.key()).toInt == bucketNum) + if (flussTable.hasBucketKey) { + assertResult(expectedBucketKey)(flussTable.getBucketKeys.asScala.mkString(",")) + assertResult(expectedBucketKey)(paimonTable.options().get(CoreOptions.BUCKET_KEY.key())) + } + + val paimonRowType = paimonTable.rowType + assert(paimonRowType.equals(expectedPaimonRowType)) + assert(flussTable.getComment.equals(paimonTable.comment())) + } + + test("Lake Catalog: basic table") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyPaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + } + + test("Catalog: show tables") { + withTable("test_tbl", "test_tbl1", "tbl_a") { + sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string) COMMENT 'my test table'") + sql( + s"CREATE TABLE $DEFAULT_DATABASE.test_tbl1 (id int, name string) COMMENT 'my test table1'") + sql(s"CREATE TABLE $DEFAULT_DATABASE.tbl_a (id int, name string) COMMENT 'my table a'") + + checkAnswer( + sql("SHOW TABLES"), + Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( + "fluss", + "tbl_a", + false) :: Nil) + + checkAnswer( + sql(s"SHOW TABLES in $DEFAULT_DATABASE"), + Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( + "fluss", + "tbl_a", + false) :: Nil) + + checkAnswer( + sql(s"SHOW TABLES from $DEFAULT_DATABASE"), + Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( + "fluss", + "tbl_a", + false) :: Nil) + + checkAnswer( + sql(s"SHOW TABLES from $DEFAULT_DATABASE like 'test_*'"), + Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Nil) + } + } + + test("Catalog: primary-key table") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt string) + |PARTITIONED BY (pt) + |TBLPROPERTIES("primary.key" = "id,pt") + |""".stripMargin) + + val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() + assertThatList(tbl1.getPrimaryKeys).hasSameElementsAs(Seq("id", "pt").toList.asJava) + assertThat(tbl1.getNumBuckets).isEqualTo(1) + assertThat(tbl1.getBucketKeys.contains("id")).isEqualTo(true) + assertThat(tbl1.getPartitionKeys.contains("pt")).isEqualTo(true) + + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name string, pt1 string, pt2 string) + |PARTITIONED BY (pt1, pt2) + |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, "bucket.key" = "pk1") + |""".stripMargin) + + val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl2")).get() + assertThatList(tbl2.getPrimaryKeys).hasSameElementsAs( + Seq("pk1", "pk2", "pt1", "pt2").toList.asJava) + assertThat(tbl2.getNumBuckets).isEqualTo(3) + assertThatList(tbl2.getBucketKeys).hasSameElementsAs(Seq("pk1").toList.asJava) + } + + test("Catalog: check namespace and table created by admin") { + val dbName = "db_by_fluss_admin" + val tblName = "tbl_by_fluss_admin" + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[SparkCatalog] + + // check namespace + val dbDesc = DatabaseDescriptor.builder().comment("created by admin").build() + admin.createDatabase(dbName, dbDesc, true).get() + assert(catalog.namespaceExists(Array(dbName))) + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(dbName) :: Nil) + + // check table + val tablePath = TablePath.of(dbName, tblName) + val rt = RowType + .builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("pt", DataTypes.STRING()) + .build() + val tableDesc = TableDescriptor + .builder() + .schema(Schema.newBuilder().fromRowType(rt).build()) + .partitionedBy("pt") + .build() + admin.createTable(tablePath, tableDesc, false).get() + assert( + catalog.tableExists(Identifier.of(Array(tablePath.getDatabaseName), tablePath.getTableName))) + val expectDescTable = Seq( + Row("id", "int", null), + Row("name", "string", null), + Row("pt", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("pt", "string", null) + ) + checkAnswer( + sql(s"DESC $dbName.$tblName"), + expectDescTable + ) + + admin.dropTable(tablePath, true).get() + checkAnswer(sql(s"SHOW TABLES IN $dbName"), Nil) + + admin.dropDatabase(dbName, true, true).get() + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + } +} diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala new file mode 100644 index 0000000000..f1460303a1 --- /dev/null +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake.paimon + +import org.apache.fluss.client.{Connection, ConnectionFactory} +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.exception.FlussRuntimeException +import org.apache.fluss.flink.tiering.LakeTieringJobBuilder +import org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL +import org.apache.fluss.metadata.DataLakeFormat +import org.apache.fluss.server.testutils.FlussClusterExtension +import org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties +import org.apache.fluss.spark.SparkCatalog + +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.core.execution.JobClient +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory} +import org.apache.paimon.options.Options +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +import java.nio.file.Files +import java.time.Duration +import java.util + +class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { + + protected val DEFAULT_CATALOG = "fluss_catalog" + protected val DEFAULT_DATABASE = "fluss" + + protected var conn: Connection = _ + protected var admin: Admin = _ + + private var execEnv: StreamExecutionEnvironment = _ + + val flussServer: FlussClusterExtension = + FlussClusterExtension.builder + .setClusterConf(flussConf) + .setNumOfTabletServers(3) + .build + + protected var paimonCatalog: Catalog = _ + protected var warehousePath: String = _ + + protected def flussConf: Configuration = { + val conf = new Configuration + conf + .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + conf.setInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE) + conf.setString("datalake.format", "paimon") + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + try { + warehousePath = + Files.createTempDirectory("fluss-testing-datalake-tiered").resolve("warehouse").toString + } catch { + case e: Exception => + throw new FlussRuntimeException("Failed to create warehouse path") + } + conf.setString("datalake.paimon.warehouse", warehousePath) + paimonCatalog = CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))) + conf + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + flussServer.start() + conn = ConnectionFactory.createConnection(flussServer.getClientConfig) + admin = conn.getAdmin + + spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", + flussServer.getBootstrapServers) + spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG) + + sql(s"USE $DEFAULT_DATABASE") + + execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setParallelism(2) + execEnv.enableCheckpointing(1000) + } + + override protected def afterAll(): Unit = { + super.afterAll() + + if (admin != null) { + admin.close() + admin = null + } + if (conn != null) { + conn.close() + conn = null + } + flussServer.close() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + + execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) + execEnv.setParallelism(2) + execEnv = StreamExecutionEnvironment.getExecutionEnvironment + } + + protected def buildTieringJob(execEnv: StreamExecutionEnvironment): JobClient = { + val flussConfig = new Configuration(flussServer.getClientConfig) + flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)) + LakeTieringJobBuilder + .newBuilder( + execEnv, + flussConfig, + Configuration.fromMap(getPaimonCatalogConf), + new Configuration, + DataLakeFormat.PAIMON.toString) + .build + } + + protected def getPaimonCatalogConf: util.Map[String, String] = { + val paimonConf = new util.HashMap[String, String] + paimonConf.put("metastore", "filesystem") + paimonConf.put("warehouse", warehousePath) + paimonConf + } +} diff --git a/fluss-spark/fluss-spark-lake/pom.xml b/fluss-spark/fluss-spark-lake/pom.xml new file mode 100644 index 0000000000..b960a6f5e7 --- /dev/null +++ b/fluss-spark/fluss-spark-lake/pom.xml @@ -0,0 +1,59 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.9-SNAPSHOT + + + fluss-spark-lake + Fluss : Engine Spark : Lake + pom + + + fluss-spark-lake-paimon + + + + 1.20 + 1.20.1 + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + + diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml index 7773528096..f0c625afd1 100644 --- a/fluss-spark/pom.xml +++ b/fluss-spark/pom.xml @@ -36,6 +36,7 @@ fluss-spark-ut fluss-spark-3.5 fluss-spark-3.4 + fluss-spark-lake From 0cb79500599d2bde67d039d992dacb0caf4085b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 21 Jan 2026 16:30:02 +0800 Subject: [PATCH 2/4] add lake paimon ddl ut --- .../org/apache/fluss/spark/SparkCatalog.scala | 2 +- .../apache/fluss/spark/SparkConversions.scala | 6 +- .../paimon/SparkLakePaimonCatalogTest.scala | 310 +++++++++++++++++- .../lake/paimon/SparkLakePaimonTestBase.scala | 25 ++ 4 files changed, 326 insertions(+), 17 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 842ef9b395..300d36904b 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -74,7 +74,7 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) { throw new TableAlreadyExistsException(ident) } else { - throw new RuntimeException(e) + throw e } } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala index f85e33f81e..fe903f36c4 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -54,7 +54,7 @@ object SparkConversions { tableDescriptorBuilder.partitionedBy(partitionKey: _*) val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { - val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim) schemaBuilder.primaryKey(pks: _*) pks } else { @@ -64,7 +64,7 @@ object SparkConversions { if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { - caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim) } else { primaryKeys.filterNot(partitionKey.contains) } @@ -76,7 +76,7 @@ object SparkConversions { } val (tableProps, customProps) = - caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + caseInsensitiveProps.filterNot(e => SPARK_TABLE_OPTIONS.contains(e._1)).partition { case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX) } diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala index deb14fce82..45d2b53214 100644 --- a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala @@ -17,11 +17,12 @@ package org.apache.fluss.spark.lake.paimon -import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.config.{ConfigOptions, FlussConfigUtils} +import org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS import org.apache.fluss.metadata._ import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME} import org.apache.fluss.spark.SparkCatalog -import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER} +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER, PRIMARY_KEY} import org.apache.fluss.types.{DataTypes, RowType} import org.apache.paimon.CoreOptions @@ -29,12 +30,14 @@ import org.apache.paimon.table.Table import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.Identifier import org.assertj.core.api.Assertions.{assertThat, assertThatList} +import org.scalatest.matchers.must.Matchers.contain +import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper} import scala.collection.JavaConverters._ class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { - private def verifyPaimonTable( + private def verifyLakePaimonTable( paimonTable: Table, flussTable: TableInfo, expectedPaimonRowType: org.apache.paimon.types.RowType, @@ -43,39 +46,320 @@ class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { if (!flussTable.hasPrimaryKey) { assert(paimonTable.primaryKeys.isEmpty) } else { - assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, "primary key")( + assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, "check primary key")( paimonTable.primaryKeys()) } if (flussTable.isPartitioned) { - assertResult(flussTable.getPartitionKeys, "partition keys")(paimonTable.partitionKeys()) + assertResult(flussTable.getPartitionKeys, "check partition keys")(paimonTable.partitionKeys()) } assert(flussTable.getNumBuckets == bucketNum) - assert(paimonTable.options().get(CoreOptions.BUCKET.key()).toInt == bucketNum) + + if (expectedBucketKey != null) { + assert( + paimonTable + .options() + .asScala + .getOrElse(CoreOptions.BUCKET.key(), CoreOptions.BUCKET.defaultValue().toString) + .toInt == bucketNum) + } + if (flussTable.hasBucketKey) { - assertResult(expectedBucketKey)(flussTable.getBucketKeys.asScala.mkString(",")) - assertResult(expectedBucketKey)(paimonTable.options().get(CoreOptions.BUCKET_KEY.key())) + assertResult(expectedBucketKey, "check fluss table bucket key")( + flussTable.getBucketKeys.asScala.mkString(",")) + assertResult(expectedBucketKey, "check paimon table bucket key")( + paimonTable.options().get(CoreOptions.BUCKET_KEY.key())) } + val expectedProperties = + (flussTable.getProperties.toMap.asScala ++ flussTable.getCustomProperties.toMap.asScala) + .filterNot(_._1.startsWith(FlussConfigUtils.TABLE_PREFIX + "datalake.")) + .map { + case (k, v) => + if (k.startsWith("paimon.")) { + (k.substring("paimon.".length), v) + } else { + (s"fluss.$k", v) + } + } + .toMap + paimonTable.options().asScala.should(contain).allElementsOf(expectedProperties) + val paimonRowType = paimonTable.rowType - assert(paimonRowType.equals(expectedPaimonRowType)) - assert(flussTable.getComment.equals(paimonTable.comment())) + assert(paimonRowType.getFieldCount == expectedPaimonRowType.getFieldCount) + paimonRowType.getFields.asScala.zip(expectedPaimonRowType.getFields.asScala).foreach { + case (actual, expect) => + assert(actual.equalsIgnoreFieldId(expect), s"check table schema: $actual vs $expect") + } + + assert(flussTable.getComment.equals(paimonTable.comment()), "check table comments") } test("Lake Catalog: basic table") { - withTable("t") { + // bucket log table + var tableName = "bucket_log_table" + withTable(tableName) { sql(s""" - |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // non-bucket log table + tableName = "non_bucket_log_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + + // pk table + tableName = "pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // partitioned pk table + tableName = "partitioned_pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string, pt1 string) + | PARTITIONED BY (pt1) | TBLPROPERTIES ( | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id, pt1', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + "pt1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + } + + test("Lake Catalog: table with all types") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t + | (c1 boolean, c2 byte, c3 short, c4 int, c5 long, c6 float, c7 double, c8 date, + | c9 timestamp, c10 timestamp_ntz, c11 string, c12 binary, c13 decimal(10, 2), + | c14 array, c15 struct, c16 map) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') |""".stripMargin) val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() val paimonTable = paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) - verifyPaimonTable( + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.BOOLEAN, + org.apache.paimon.types.DataTypes.TINYINT, + org.apache.paimon.types.DataTypes.SMALLINT, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.FLOAT, + org.apache.paimon.types.DataTypes.DOUBLE, + org.apache.paimon.types.DataTypes.DATE, + org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + org.apache.paimon.types.DataTypes.TIMESTAMP, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.BYTES, + org.apache.paimon.types.DataTypes.DECIMAL(10, 2), + org.apache.paimon.types.DataTypes.ARRAY(org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.ROW( + org.apache.paimon.types.DataTypes + .FIELD(0, "a", org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes + .FIELD(1, "b", org.apache.paimon.types.DataTypes.STRING) + ), + org.apache.paimon.types.DataTypes.MAP( + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "c1", + "c2", + "c3", + "c4", + "c5", + "c6", + "c7", + "c8", + "c9", + "c10", + "c11", + "c12", + "c13", + "c14", + "c15", + "c16", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + } + + test("Lake Catalog: unsettable properties") { + withTable("t") { + val unsettableProperties = + PAIMON_UNSETTABLE_OPTIONS.asScala.map(e => s"'$e' = 'v'").mkString(", ") + + intercept[java.util.concurrent.ExecutionException] { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | $unsettableProperties, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + }.getCause shouldBe a[org.apache.fluss.exception.InvalidConfigException] + } + } + + test("Lake Catalog: alter table with lake enabled") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = false, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + intercept[org.apache.paimon.catalog.Catalog.TableNotExistException] { + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + } + + sql(s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES ('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)") + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyLakePaimonTable( paimonTable, flussTable, org.apache.paimon.types.RowType diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala index f1460303a1..26d1749f66 100644 --- a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala @@ -117,12 +117,37 @@ class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { override protected def beforeEach(): Unit = { super.beforeEach() + conn = ConnectionFactory.createConnection(flussServer.getClientConfig) + admin = conn.getAdmin + execEnv = StreamExecutionEnvironment.getExecutionEnvironment execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) execEnv.setParallelism(2) execEnv = StreamExecutionEnvironment.getExecutionEnvironment } + override protected def afterEach(): Unit = { + super.afterEach() + + if (admin != null) { + admin.close() + admin = null + } + if (conn != null) { + conn.close() + conn = null + } + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + super.withTable(tableNames: _*)(f) + tableNames.foreach( + tableName => + paimonCatalog.dropTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName), + true)) + } + protected def buildTieringJob(execEnv: StreamExecutionEnvironment): JobClient = { val flussConfig = new Configuration(flussServer.getClientConfig) flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)) From cdb87f19461e41625253c74482f48a719692e911 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 21 Jan 2026 16:46:38 +0800 Subject: [PATCH 3/4] support set table properties --- .../org/apache/fluss/spark/SparkCatalog.scala | 19 ++- .../apache/fluss/spark/SparkConversions.scala | 10 ++ .../paimon/SparkLakePaimonCatalogTest.scala | 122 +----------------- 3 files changed, 33 insertions(+), 118 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 300d36904b..6142efa0ef 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -80,7 +80,24 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - throw new UnsupportedOperationException("Altering table is not supported") + if (changes.size != 1 || !changes.head.isInstanceOf[TableChange.SetProperty]) { + throw new IllegalArgumentException("Altering table only supports setting properties for now") + } + try { + admin + .alterTable(toTablePath(ident), SparkConversions.toFlussTableChanges(changes).asJava, false) + .get() + loadTable(ident) + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[TableNotExistException]) { + throw new NoSuchTableException(ident) + } else { + throw e + } + case e: UnsupportedOperationException => + throw new IllegalArgumentException(e) + } } override def dropTable(ident: Identifier): Boolean = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala index fe903f36c4..61e77789b3 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType import org.apache.spark.sql.FlussIdentityTransform import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -97,4 +98,13 @@ object SparkConversions { } partitionKeys.toArray } + + def toFlussTableChanges(changes: Seq[TableChange]): Seq[org.apache.fluss.metadata.TableChange] = { + changes.map { + case e: TableChange.SetProperty => + org.apache.fluss.metadata.TableChange.set(e.property(), e.value()) + // TODO Add full support for table changes + case _ => throw new UnsupportedOperationException("Unsupported table change") + } + } } diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala index 45d2b53214..e723f33167 100644 --- a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala +++ b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala @@ -21,15 +21,10 @@ import org.apache.fluss.config.{ConfigOptions, FlussConfigUtils} import org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS import org.apache.fluss.metadata._ import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME} -import org.apache.fluss.spark.SparkCatalog import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER, PRIMARY_KEY} -import org.apache.fluss.types.{DataTypes, RowType} import org.apache.paimon.CoreOptions import org.apache.paimon.table.Table -import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.Identifier -import org.assertj.core.api.Assertions.{assertThat, assertThatList} import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper} @@ -337,7 +332,7 @@ class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { | $unsettableProperties, | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') |""".stripMargin) - }.getCause shouldBe a[org.apache.fluss.exception.InvalidConfigException] + }.getCause.shouldBe(a[org.apache.fluss.exception.InvalidConfigException]) } } @@ -353,12 +348,12 @@ class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) } - sql(s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES ('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)") + sql( + s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES ('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)") val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() val paimonTable = - paimonCatalog.getTable( - org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) verifyLakePaimonTable( paimonTable, flussTable, @@ -373,117 +368,10 @@ class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { ), Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) ), - "id", + null, 2 ) } } - test("Catalog: show tables") { - withTable("test_tbl", "test_tbl1", "tbl_a") { - sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string) COMMENT 'my test table'") - sql( - s"CREATE TABLE $DEFAULT_DATABASE.test_tbl1 (id int, name string) COMMENT 'my test table1'") - sql(s"CREATE TABLE $DEFAULT_DATABASE.tbl_a (id int, name string) COMMENT 'my table a'") - - checkAnswer( - sql("SHOW TABLES"), - Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( - "fluss", - "tbl_a", - false) :: Nil) - - checkAnswer( - sql(s"SHOW TABLES in $DEFAULT_DATABASE"), - Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( - "fluss", - "tbl_a", - false) :: Nil) - - checkAnswer( - sql(s"SHOW TABLES from $DEFAULT_DATABASE"), - Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Row( - "fluss", - "tbl_a", - false) :: Nil) - - checkAnswer( - sql(s"SHOW TABLES from $DEFAULT_DATABASE like 'test_*'"), - Row("fluss", "test_tbl", false) :: Row("fluss", "test_tbl1", false) :: Nil) - } - } - - test("Catalog: primary-key table") { - sql(s""" - |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt string) - |PARTITIONED BY (pt) - |TBLPROPERTIES("primary.key" = "id,pt") - |""".stripMargin) - - val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() - assertThatList(tbl1.getPrimaryKeys).hasSameElementsAs(Seq("id", "pt").toList.asJava) - assertThat(tbl1.getNumBuckets).isEqualTo(1) - assertThat(tbl1.getBucketKeys.contains("id")).isEqualTo(true) - assertThat(tbl1.getPartitionKeys.contains("pt")).isEqualTo(true) - - sql( - s""" - |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name string, pt1 string, pt2 string) - |PARTITIONED BY (pt1, pt2) - |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, "bucket.key" = "pk1") - |""".stripMargin) - - val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl2")).get() - assertThatList(tbl2.getPrimaryKeys).hasSameElementsAs( - Seq("pk1", "pk2", "pt1", "pt2").toList.asJava) - assertThat(tbl2.getNumBuckets).isEqualTo(3) - assertThatList(tbl2.getBucketKeys).hasSameElementsAs(Seq("pk1").toList.asJava) - } - - test("Catalog: check namespace and table created by admin") { - val dbName = "db_by_fluss_admin" - val tblName = "tbl_by_fluss_admin" - val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[SparkCatalog] - - // check namespace - val dbDesc = DatabaseDescriptor.builder().comment("created by admin").build() - admin.createDatabase(dbName, dbDesc, true).get() - assert(catalog.namespaceExists(Array(dbName))) - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(dbName) :: Nil) - - // check table - val tablePath = TablePath.of(dbName, tblName) - val rt = RowType - .builder() - .field("id", DataTypes.INT()) - .field("name", DataTypes.STRING()) - .field("pt", DataTypes.STRING()) - .build() - val tableDesc = TableDescriptor - .builder() - .schema(Schema.newBuilder().fromRowType(rt).build()) - .partitionedBy("pt") - .build() - admin.createTable(tablePath, tableDesc, false).get() - assert( - catalog.tableExists(Identifier.of(Array(tablePath.getDatabaseName), tablePath.getTableName))) - val expectDescTable = Seq( - Row("id", "int", null), - Row("name", "string", null), - Row("pt", "string", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("pt", "string", null) - ) - checkAnswer( - sql(s"DESC $dbName.$tblName"), - expectDescTable - ) - - admin.dropTable(tablePath, true).get() - checkAnswer(sql(s"SHOW TABLES IN $dbName"), Nil) - - admin.dropDatabase(dbName, true, true).get() - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) - } } From bffc86c7ef467eb99d157a94fbb08785c55576ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 21 Jan 2026 20:57:36 +0800 Subject: [PATCH 4/4] move to fluss-spark-ut --- .../fluss-spark-lake-paimon/pom.xml | 201 ------------------ fluss-spark/fluss-spark-lake/pom.xml | 59 ----- fluss-spark/fluss-spark-ut/pom.xml | 18 ++ ...e.fluss.lake.lakestorage.LakeStoragePlugin | 0 .../paimon/SparkLakePaimonCatalogTest.scala | 0 .../lake/paimon/SparkLakePaimonTestBase.scala | 38 ---- fluss-spark/pom.xml | 1 - 7 files changed, 18 insertions(+), 299 deletions(-) delete mode 100644 fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml delete mode 100644 fluss-spark/fluss-spark-lake/pom.xml rename fluss-spark/{fluss-spark-lake/fluss-spark-lake-paimon => fluss-spark-ut}/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin (100%) rename fluss-spark/{fluss-spark-lake/fluss-spark-lake-paimon => fluss-spark-ut}/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala (100%) rename fluss-spark/{fluss-spark-lake/fluss-spark-lake-paimon => fluss-spark-ut}/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala (74%) diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml b/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml deleted file mode 100644 index bf285b599d..0000000000 --- a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/pom.xml +++ /dev/null @@ -1,201 +0,0 @@ - - - - - 4.0.0 - - org.apache.fluss - fluss-spark-lake - 0.9-SNAPSHOT - - - fluss-spark-lake-paimon_${scala.binary.version} - Fluss : Engine Spark : Lake : Paimon - - - 1.3.1 - test - - - - - org.apache.fluss - fluss-spark-common_${scala.binary.version} - ${project.version} - test - - - - org.apache.fluss - fluss-server - ${project.version} - test - - - - org.apache.fluss - fluss-server - ${project.version} - tests - test - - - - org.apache.fluss - fluss-client - ${project.version} - test - - - - org.apache.fluss - fluss-common - ${project.version} - tests - test - - - - org.apache.fluss - fluss-lake-paimon - ${project.version} - - - - - org.eclipse.collections - eclipse-collections - 11.1.0 - test - - - - org.apache.fluss - fluss-test-utils - - - - - org.apache.curator - curator-test - ${curator.version} - test - - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - tests - test - - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} - tests - test - - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark.version} - tests - test - - - - junit - junit - 3.8.1 - test - - - - org.apache.flink - flink-core - ${flink.version} - ${tiering.deps.scope} - - - org.apache.flink - flink-table-common - ${flink.version} - ${tiering.deps.scope} - - - org.apache.flink - flink-table-runtime - ${flink.version} - ${tiering.deps.scope} - - - org.apache.flink - flink-connector-base - ${flink.version} - ${tiering.deps.scope} - - - org.apache.flink - flink-connector-files - ${flink.version} - ${tiering.deps.scope} - - - org.apache.flink - flink-table-test-utils - ${flink.version} - ${tiering.deps.scope} - - - org.apache.fluss - fluss-flink-${flink.major.version} - ${project.version} - ${tiering.deps.scope} - - - org.apache.paimon - paimon-bundle - ${paimon.version} - ${tiering.deps.scope} - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - prepare-test-jar - test-compile - - test-jar - - - - - - - diff --git a/fluss-spark/fluss-spark-lake/pom.xml b/fluss-spark/fluss-spark-lake/pom.xml deleted file mode 100644 index b960a6f5e7..0000000000 --- a/fluss-spark/fluss-spark-lake/pom.xml +++ /dev/null @@ -1,59 +0,0 @@ - - - - - 4.0.0 - - org.apache.fluss - fluss-spark - 0.9-SNAPSHOT - - - fluss-spark-lake - Fluss : Engine Spark : Lake - pom - - - fluss-spark-lake-paimon - - - - 1.20 - 1.20.1 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - prepare-test-jar - test-compile - - test-jar - - - - - - - diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index ffa424d261..2e02c4d456 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -30,6 +30,10 @@ fluss-spark-ut_${scala.binary.version} Fluss : Engine Spark : UT + + 1.3.1 + + org.apache.fluss @@ -68,6 +72,20 @@ test + + org.apache.fluss + fluss-lake-paimon + ${project.version} + test + + + + org.apache.paimon + paimon-bundle + ${paimon.version} + test + + org.eclipse.collections diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin similarity index 100% rename from fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin rename to fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala similarity index 100% rename from fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala diff --git a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala similarity index 74% rename from fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala index 26d1749f66..4f1aadf31c 100644 --- a/fluss-spark/fluss-spark-lake/fluss-spark-lake-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala @@ -21,16 +21,10 @@ import org.apache.fluss.client.{Connection, ConnectionFactory} import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.exception.FlussRuntimeException -import org.apache.fluss.flink.tiering.LakeTieringJobBuilder -import org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL -import org.apache.fluss.metadata.DataLakeFormat import org.apache.fluss.server.testutils.FlussClusterExtension import org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties import org.apache.fluss.spark.SparkCatalog -import org.apache.flink.api.common.RuntimeExecutionMode -import org.apache.flink.core.execution.JobClient -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory} import org.apache.paimon.options.Options import org.apache.spark.sql.QueryTest @@ -38,7 +32,6 @@ import org.apache.spark.sql.test.SharedSparkSession import java.nio.file.Files import java.time.Duration -import java.util class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { @@ -48,8 +41,6 @@ class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { protected var conn: Connection = _ protected var admin: Admin = _ - private var execEnv: StreamExecutionEnvironment = _ - val flussServer: FlussClusterExtension = FlussClusterExtension.builder .setClusterConf(flussConf) @@ -94,10 +85,6 @@ class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG) sql(s"USE $DEFAULT_DATABASE") - - execEnv = StreamExecutionEnvironment.getExecutionEnvironment - execEnv.setParallelism(2) - execEnv.enableCheckpointing(1000) } override protected def afterAll(): Unit = { @@ -119,11 +106,6 @@ class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { conn = ConnectionFactory.createConnection(flussServer.getClientConfig) admin = conn.getAdmin - - execEnv = StreamExecutionEnvironment.getExecutionEnvironment - execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) - execEnv.setParallelism(2) - execEnv = StreamExecutionEnvironment.getExecutionEnvironment } override protected def afterEach(): Unit = { @@ -147,24 +129,4 @@ class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName), true)) } - - protected def buildTieringJob(execEnv: StreamExecutionEnvironment): JobClient = { - val flussConfig = new Configuration(flussServer.getClientConfig) - flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)) - LakeTieringJobBuilder - .newBuilder( - execEnv, - flussConfig, - Configuration.fromMap(getPaimonCatalogConf), - new Configuration, - DataLakeFormat.PAIMON.toString) - .build - } - - protected def getPaimonCatalogConf: util.Map[String, String] = { - val paimonConf = new util.HashMap[String, String] - paimonConf.put("metastore", "filesystem") - paimonConf.put("warehouse", warehousePath) - paimonConf - } } diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml index f0c625afd1..7773528096 100644 --- a/fluss-spark/pom.xml +++ b/fluss-spark/pom.xml @@ -36,7 +36,6 @@ fluss-spark-ut fluss-spark-3.5 fluss-spark-3.4 - fluss-spark-lake