diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 842ef9b395..6142efa0ef 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -74,13 +74,30 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) { throw new TableAlreadyExistsException(ident) } else { - throw new RuntimeException(e) + throw e } } } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - throw new UnsupportedOperationException("Altering table is not supported") + if (changes.size != 1 || !changes.head.isInstanceOf[TableChange.SetProperty]) { + throw new IllegalArgumentException("Altering table only supports setting properties for now") + } + try { + admin + .alterTable(toTablePath(ident), SparkConversions.toFlussTableChanges(changes).asJava, false) + .get() + loadTable(ident) + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[TableNotExistException]) { + throw new NoSuchTableException(ident) + } else { + throw e + } + case e: UnsupportedOperationException => + throw new IllegalArgumentException(e) + } } override def dropTable(ident: Identifier): Boolean = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala index f85e33f81e..61e77789b3 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType import org.apache.spark.sql.FlussIdentityTransform import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -54,7 +55,7 @@ object SparkConversions { tableDescriptorBuilder.partitionedBy(partitionKey: _*) val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { - val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim) schemaBuilder.primaryKey(pks: _*) pks } else { @@ -64,7 +65,7 @@ object SparkConversions { if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { - caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim) } else { primaryKeys.filterNot(partitionKey.contains) } @@ -76,7 +77,7 @@ object SparkConversions { } val (tableProps, customProps) = - caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + caseInsensitiveProps.filterNot(e => SPARK_TABLE_OPTIONS.contains(e._1)).partition { case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX) } @@ -97,4 +98,13 @@ object SparkConversions { } partitionKeys.toArray } + + def toFlussTableChanges(changes: Seq[TableChange]): Seq[org.apache.fluss.metadata.TableChange] = { + changes.map { + case e: TableChange.SetProperty => + org.apache.fluss.metadata.TableChange.set(e.property(), e.value()) + // TODO Add full support for table changes + case _ => throw new UnsupportedOperationException("Unsupported table change") + } + } } diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index ffa424d261..2e02c4d456 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -30,6 +30,10 @@ fluss-spark-ut_${scala.binary.version} Fluss : Engine Spark : UT + + 1.3.1 + + org.apache.fluss @@ -68,6 +72,20 @@ test + + org.apache.fluss + fluss-lake-paimon + ${project.version} + test + + + + org.apache.paimon + paimon-bundle + ${paimon.version} + test + + org.eclipse.collections diff --git a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin new file mode 100644 index 0000000000..69bf0f8a4b --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin \ No newline at end of file diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala new file mode 100644 index 0000000000..e723f33167 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake.paimon + +import org.apache.fluss.config.{ConfigOptions, FlussConfigUtils} +import org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS +import org.apache.fluss.metadata._ +import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME} +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER, PRIMARY_KEY} + +import org.apache.paimon.CoreOptions +import org.apache.paimon.table.Table +import org.scalatest.matchers.must.Matchers.contain +import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper} + +import scala.collection.JavaConverters._ + +class SparkLakePaimonCatalogTest extends SparkLakePaimonTestBase { + + private def verifyLakePaimonTable( + paimonTable: Table, + flussTable: TableInfo, + expectedPaimonRowType: org.apache.paimon.types.RowType, + expectedBucketKey: String, + bucketNum: Int): Unit = { + if (!flussTable.hasPrimaryKey) { + assert(paimonTable.primaryKeys.isEmpty) + } else { + assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, "check primary key")( + paimonTable.primaryKeys()) + } + + if (flussTable.isPartitioned) { + assertResult(flussTable.getPartitionKeys, "check partition keys")(paimonTable.partitionKeys()) + } + + assert(flussTable.getNumBuckets == bucketNum) + + if (expectedBucketKey != null) { + assert( + paimonTable + .options() + .asScala + .getOrElse(CoreOptions.BUCKET.key(), CoreOptions.BUCKET.defaultValue().toString) + .toInt == bucketNum) + } + + if (flussTable.hasBucketKey) { + assertResult(expectedBucketKey, "check fluss table bucket key")( + flussTable.getBucketKeys.asScala.mkString(",")) + assertResult(expectedBucketKey, "check paimon table bucket key")( + paimonTable.options().get(CoreOptions.BUCKET_KEY.key())) + } + + val expectedProperties = + (flussTable.getProperties.toMap.asScala ++ flussTable.getCustomProperties.toMap.asScala) + .filterNot(_._1.startsWith(FlussConfigUtils.TABLE_PREFIX + "datalake.")) + .map { + case (k, v) => + if (k.startsWith("paimon.")) { + (k.substring("paimon.".length), v) + } else { + (s"fluss.$k", v) + } + } + .toMap + paimonTable.options().asScala.should(contain).allElementsOf(expectedProperties) + + val paimonRowType = paimonTable.rowType + assert(paimonRowType.getFieldCount == expectedPaimonRowType.getFieldCount) + paimonRowType.getFields.asScala.zip(expectedPaimonRowType.getFields.asScala).foreach { + case (actual, expect) => + assert(actual.equalsIgnoreFieldId(expect), s"check table schema: $actual vs $expect") + } + + assert(flussTable.getComment.equals(paimonTable.comment()), "check table comments") + } + + test("Lake Catalog: basic table") { + // bucket log table + var tableName = "bucket_log_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // non-bucket log table + tableName = "non_bucket_log_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + + // pk table + tableName = "pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // partitioned pk table + tableName = "partitioned_pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string, pt1 string) + | PARTITIONED BY (pt1) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id, pt1', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + "pt1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + } + + test("Lake Catalog: table with all types") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t + | (c1 boolean, c2 byte, c3 short, c4 int, c5 long, c6 float, c7 double, c8 date, + | c9 timestamp, c10 timestamp_ntz, c11 string, c12 binary, c13 decimal(10, 2), + | c14 array, c15 struct, c16 map) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.BOOLEAN, + org.apache.paimon.types.DataTypes.TINYINT, + org.apache.paimon.types.DataTypes.SMALLINT, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.FLOAT, + org.apache.paimon.types.DataTypes.DOUBLE, + org.apache.paimon.types.DataTypes.DATE, + org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + org.apache.paimon.types.DataTypes.TIMESTAMP, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.BYTES, + org.apache.paimon.types.DataTypes.DECIMAL(10, 2), + org.apache.paimon.types.DataTypes.ARRAY(org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.ROW( + org.apache.paimon.types.DataTypes + .FIELD(0, "a", org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes + .FIELD(1, "b", org.apache.paimon.types.DataTypes.STRING) + ), + org.apache.paimon.types.DataTypes.MAP( + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "c1", + "c2", + "c3", + "c4", + "c5", + "c6", + "c7", + "c8", + "c9", + "c10", + "c11", + "c12", + "c13", + "c14", + "c15", + "c16", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + } + + test("Lake Catalog: unsettable properties") { + withTable("t") { + val unsettableProperties = + PAIMON_UNSETTABLE_OPTIONS.asScala.map(e => s"'$e' = 'v'").mkString(", ") + + intercept[java.util.concurrent.ExecutionException] { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | $unsettableProperties, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + }.getCause.shouldBe(a[org.apache.fluss.exception.InvalidConfigException]) + } + } + + test("Lake Catalog: alter table with lake enabled") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = false, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + intercept[org.apache.paimon.catalog.Catalog.TableNotExistException] { + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + } + + sql( + s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES ('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)") + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + } + +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala new file mode 100644 index 0000000000..4f1aadf31c --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonTestBase.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake.paimon + +import org.apache.fluss.client.{Connection, ConnectionFactory} +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.exception.FlussRuntimeException +import org.apache.fluss.server.testutils.FlussClusterExtension +import org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties +import org.apache.fluss.spark.SparkCatalog + +import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory} +import org.apache.paimon.options.Options +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +import java.nio.file.Files +import java.time.Duration + +class SparkLakePaimonTestBase extends QueryTest with SharedSparkSession { + + protected val DEFAULT_CATALOG = "fluss_catalog" + protected val DEFAULT_DATABASE = "fluss" + + protected var conn: Connection = _ + protected var admin: Admin = _ + + val flussServer: FlussClusterExtension = + FlussClusterExtension.builder + .setClusterConf(flussConf) + .setNumOfTabletServers(3) + .build + + protected var paimonCatalog: Catalog = _ + protected var warehousePath: String = _ + + protected def flussConf: Configuration = { + val conf = new Configuration + conf + .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + conf.setInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE) + conf.setString("datalake.format", "paimon") + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + try { + warehousePath = + Files.createTempDirectory("fluss-testing-datalake-tiered").resolve("warehouse").toString + } catch { + case e: Exception => + throw new FlussRuntimeException("Failed to create warehouse path") + } + conf.setString("datalake.paimon.warehouse", warehousePath) + paimonCatalog = CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))) + conf + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + flussServer.start() + conn = ConnectionFactory.createConnection(flussServer.getClientConfig) + admin = conn.getAdmin + + spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", + flussServer.getBootstrapServers) + spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG) + + sql(s"USE $DEFAULT_DATABASE") + } + + override protected def afterAll(): Unit = { + super.afterAll() + + if (admin != null) { + admin.close() + admin = null + } + if (conn != null) { + conn.close() + conn = null + } + flussServer.close() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + + conn = ConnectionFactory.createConnection(flussServer.getClientConfig) + admin = conn.getAdmin + } + + override protected def afterEach(): Unit = { + super.afterEach() + + if (admin != null) { + admin.close() + admin = null + } + if (conn != null) { + conn.close() + conn = null + } + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + super.withTable(tableNames: _*)(f) + tableNames.foreach( + tableName => + paimonCatalog.dropTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName), + true)) + } +}