diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 842ef9b395..982aee6a70 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -74,13 +74,34 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) { throw new TableAlreadyExistsException(ident) } else { - throw new RuntimeException(e) + throw e } } } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - throw new UnsupportedOperationException("Altering table is not supported") + if ( + !changes.forall( + e => e.isInstanceOf[TableChange.SetProperty] || e.isInstanceOf[TableChange.RemoveProperty]) + ) { + throw new IllegalArgumentException( + "Altering table only supports set or remove properties for now") + } + try { + admin + .alterTable(toTablePath(ident), SparkConversions.toFlussTableChanges(changes).asJava, false) + .get() + loadTable(ident) + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[TableNotExistException]) { + throw new NoSuchTableException(ident) + } else { + throw e + } + case e: UnsupportedOperationException => + throw new IllegalArgumentException(e) + } } override def dropTable(ident: Identifier): Boolean = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala index f85e33f81e..eeb726678d 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType import org.apache.spark.sql.FlussIdentityTransform import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -54,7 +55,7 @@ object SparkConversions { tableDescriptorBuilder.partitionedBy(partitionKey: _*) val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { - val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim) schemaBuilder.primaryKey(pks: _*) pks } else { @@ -64,7 +65,7 @@ object SparkConversions { if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { - caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim) } else { primaryKeys.filterNot(partitionKey.contains) } @@ -76,7 +77,7 @@ object SparkConversions { } val (tableProps, customProps) = - caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + caseInsensitiveProps.filterNot(e => SPARK_TABLE_OPTIONS.contains(e._1)).partition { case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX) } @@ -97,4 +98,15 @@ object SparkConversions { } partitionKeys.toArray } + + def toFlussTableChanges(changes: Seq[TableChange]): Seq[org.apache.fluss.metadata.TableChange] = { + changes.map { + case p: TableChange.SetProperty => + org.apache.fluss.metadata.TableChange.set(p.property(), p.value()) + case p: TableChange.RemoveProperty => + org.apache.fluss.metadata.TableChange.reset(p.property()) + // TODO Add full support for table changes + case _ => throw new UnsupportedOperationException("Unsupported table change") + } + } } diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index ffa424d261..96d1ccb452 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -81,6 +81,20 @@ fluss-test-utils + + org.apache.fluss + fluss-lake-paimon + ${project.version} + test + + + + org.apache.paimon + paimon-bundle + ${paimon.version} + test + + org.apache.curator diff --git a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin new file mode 100644 index 0000000000..69bf0f8a4b --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin \ No newline at end of file diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index f9b98fe7c9..c8d238fffd 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -26,10 +26,8 @@ import org.apache.fluss.metadata.{TableDescriptor, TablePath} import org.apache.fluss.row.InternalRow import org.apache.fluss.server.testutils.FlussClusterExtension -import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession -import org.junit.jupiter.api.extension.RegisterExtension import java.time.Duration @@ -37,32 +35,49 @@ import scala.collection.JavaConverters._ class FlussSparkTestBase extends QueryTest with SharedSparkSession { - import FlussSparkTestBase._ - protected val DEFAULT_CATALOG = "fluss_catalog" protected val DEFAULT_DATABASE = "fluss" protected var conn: Connection = _ protected var admin: Admin = _ - override protected def sparkConf: SparkConf = { - super.sparkConf - .set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) - .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", bootstrapServers) - .set("spark.sql.defaultCatalog", DEFAULT_CATALOG) - // Enable read optimized by default temporarily. - // TODO: remove this when https://github.com/apache/fluss/issues/2427 is done. - .set("spark.sql.fluss.readOptimized", "true") - } + val flussServer: FlussClusterExtension = + FlussClusterExtension.builder + .setClusterConf(flussConf) + .setNumOfTabletServers(3) + .build override protected def beforeAll(): Unit = { super.beforeAll() - conn = ConnectionFactory.createConnection(clientConf) + flussServer.start() + conn = ConnectionFactory.createConnection(flussServer.getClientConfig) admin = conn.getAdmin + spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", + flussServer.getBootstrapServers) + spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG) + // Enable read optimized by default temporarily. + // TODO: remove this when https://github.com/apache/fluss/issues/2427 is done. + spark.conf.set("spark.sql.fluss.readOptimized", "true") + sql(s"USE $DEFAULT_DATABASE") } + override protected def afterAll(): Unit = { + super.afterAll() + if (admin != null) { + admin.close() + admin = null + } + if (conn != null) { + conn.close() + conn = null + } + flussServer.close() + } + def createTablePath(tableName: String): TablePath = { TablePath.of(DEFAULT_DATABASE, tableName) } @@ -98,21 +113,9 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { .map(record => (record.getChangeType.shortString(), record.getRow)) .toArray } -} - -@RegisterExtension -object FlussSparkTestBase { - val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension = - FlussClusterExtension.builder - .setClusterConf( - new Configuration() - .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) - ) - .setNumOfTabletServers(3) - .build - FLUSS_CLUSTER_EXTENSION.start() - - val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig - val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers + protected def flussConf: Configuration = { + val conf = new Configuration + conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index 4c4bec355a..1805f3fc5c 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -17,6 +17,7 @@ package org.apache.fluss.spark +import org.apache.fluss.config.ConfigOptions import org.apache.fluss.metadata._ import org.apache.fluss.types.{DataTypes, RowType} @@ -192,6 +193,33 @@ class SparkCatalogTest extends FlussSparkTestBase { checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) } + test("Catalog: set/remove table properties") { + withTable("t") { + sql( + s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}' = 3)") + var flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult( + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + "check table properties")(flussTable.getProperties.toMap.asScala) + assert( + flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value1") + + sql(s"ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2')") + flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult( + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + "check table properties")(flussTable.getProperties.toMap.asScala) + assert( + flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value2") + + sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + flussTable = admin.getTableInfo(createTablePath("t")).get() + assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1")) + + sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + } + } + test("Partition: show partitions") { withTable("t") { sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)") diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala new file mode 100644 index 0000000000..e486a40adc --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake.paimon + +import org.apache.fluss.config.{ConfigOptions, Configuration, FlussConfigUtils} +import org.apache.fluss.exception.FlussRuntimeException +import org.apache.fluss.lake.paimon.utils.PaimonConversions +import org.apache.fluss.metadata._ +import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME} +import org.apache.fluss.server.utils.LakeStorageUtils +import org.apache.fluss.spark.SparkCatalogTest +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, BUCKET_NUMBER, PRIMARY_KEY} + +import org.scalatest.matchers.must.Matchers.contain +import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper} + +import java.nio.file.Files + +import scala.collection.JavaConverters._ + +class SparkLakePaimonCatalogTest extends SparkCatalogTest { + + private var paimonCatalog: org.apache.paimon.catalog.Catalog = _ + private var warehousePath: String = _ + + override def flussConf: Configuration = { + val conf = super.flussConf + conf.setInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE) + conf.setString("datalake.format", "paimon") + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + try { + warehousePath = + Files.createTempDirectory("fluss-testing-datalake-tiered").resolve("warehouse").toString + } catch { + case e: Exception => + throw new FlussRuntimeException("Failed to create warehouse path") + } + conf.setString("datalake.paimon.warehouse", warehousePath) + paimonCatalog = org.apache.paimon.catalog.CatalogFactory.createCatalog( + org.apache.paimon.catalog.CatalogContext.create( + org.apache.paimon.options.Options.fromMap(LakeStorageUtils.extractLakeProperties(conf)))) + conf + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + super.withTable(tableNames: _*)(f) + tableNames.foreach( + tableName => + paimonCatalog.dropTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName), + true)) + } + + private def verifyLakePaimonTable( + paimonTable: org.apache.paimon.table.Table, + flussTable: TableInfo, + expectedPaimonRowType: org.apache.paimon.types.RowType, + expectedBucketKey: String, + bucketNum: Int): Unit = { + if (!flussTable.hasPrimaryKey) { + assert(paimonTable.primaryKeys.isEmpty) + } else { + assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, "check primary key")( + paimonTable.primaryKeys()) + } + + if (flussTable.isPartitioned) { + assertResult(flussTable.getPartitionKeys, "check partition keys")(paimonTable.partitionKeys()) + } + + assert(flussTable.getNumBuckets == bucketNum) + + if (expectedBucketKey != null) { + assert( + paimonTable + .options() + .asScala + .getOrElse( + org.apache.paimon.CoreOptions.BUCKET.key(), + org.apache.paimon.CoreOptions.BUCKET.defaultValue().toString) + .toInt == bucketNum) + } + + if (flussTable.hasBucketKey) { + assertResult(expectedBucketKey, "check fluss table bucket key")( + flussTable.getBucketKeys.asScala.mkString(",")) + assertResult(expectedBucketKey, "check paimon table bucket key")( + paimonTable.options().get(org.apache.paimon.CoreOptions.BUCKET_KEY.key())) + } + + val expectedProperties = + (flussTable.getProperties.toMap.asScala ++ flussTable.getCustomProperties.toMap.asScala) + .filterNot(_._1.startsWith(FlussConfigUtils.TABLE_PREFIX + "datalake.")) + .map { + case (k, v) => + if (k.startsWith("paimon.")) { + (k.substring("paimon.".length), v) + } else { + (s"fluss.$k", v) + } + } + .toMap + paimonTable.options().asScala.should(contain).allElementsOf(expectedProperties) + + val paimonRowType = paimonTable.rowType + assert(paimonRowType.getFieldCount == expectedPaimonRowType.getFieldCount) + paimonRowType.getFields.asScala.zip(expectedPaimonRowType.getFields.asScala).foreach { + case (actual, expect) => + assert(actual.equalsIgnoreFieldId(expect), s"check table schema: $actual vs $expect") + } + + assert(flussTable.getComment.equals(paimonTable.comment()), "check table comments") + } + + test("Lake Catalog: basic table") { + // bucket log table + var tableName = "bucket_log_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // non-bucket log table + tableName = "non_bucket_log_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + + // pk table + tableName = "pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + + // partitioned pk table + tableName = "partitioned_pk_table" + withTable(tableName) { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, pk1 string, pt1 string) + | PARTITIONED BY (pt1) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, '${BUCKET_KEY.key()}' = 'id', + | '${PRIMARY_KEY.key()}' = 'pk1, id, pt1', + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, tableName)).get() + val paimonTable = + paimonCatalog.getTable( + org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, tableName)) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT.notNull(), + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "id", + "name", + "pk1", + "pt1", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + "id", + 2 + ) + } + } + + test("Lake Catalog: table with all types") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t + | (c1 boolean, c2 byte, c3 short, c4 int, c5 long, c6 float, c7 double, c8 date, + | c9 timestamp, c10 timestamp_ntz, c11 string, c12 binary, c13 decimal(10, 2), + | c14 array, c15 struct, c16 map) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.BOOLEAN, + org.apache.paimon.types.DataTypes.TINYINT, + org.apache.paimon.types.DataTypes.SMALLINT, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.FLOAT, + org.apache.paimon.types.DataTypes.DOUBLE, + org.apache.paimon.types.DataTypes.DATE, + org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + org.apache.paimon.types.DataTypes.TIMESTAMP, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.BYTES, + org.apache.paimon.types.DataTypes.DECIMAL(10, 2), + org.apache.paimon.types.DataTypes.ARRAY(org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.ROW( + org.apache.paimon.types.DataTypes + .FIELD(0, "a", org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes + .FIELD(1, "b", org.apache.paimon.types.DataTypes.STRING) + ), + org.apache.paimon.types.DataTypes.MAP( + org.apache.paimon.types.DataTypes.STRING.notNull(), + org.apache.paimon.types.DataTypes.INT), + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply( + "c1", + "c2", + "c3", + "c4", + "c5", + "c6", + "c7", + "c8", + "c9", + "c10", + "c11", + "c12", + "c13", + "c14", + "c15", + "c16", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + } + + test("Lake Catalog: unsettable properties") { + withTable("t") { + val unsettableProperties = + PaimonConversions.PAIMON_UNSETTABLE_OPTIONS.asScala.map(e => s"'$e' = 'v'").mkString(", ") + + intercept[java.util.concurrent.ExecutionException] { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | $unsettableProperties, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + }.getCause.shouldBe(a[org.apache.fluss.exception.InvalidConfigException]) + } + } + + test("Lake Catalog: alter table with lake enabled") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = false, + | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 'paimon.file.format' = 'parquet') + |""".stripMargin) + intercept[org.apache.paimon.catalog.Catalog.TableNotExistException] { + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + } + + sql( + s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES ('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)") + + val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "t")).get() + val paimonTable = + paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, "t")) + verifyLakePaimonTable( + paimonTable, + flussTable, + org.apache.paimon.types.RowType + .of( + Array.apply( + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.STRING, + org.apache.paimon.types.DataTypes.INT, + org.apache.paimon.types.DataTypes.BIGINT, + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS + ), + Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME) + ), + null, + 2 + ) + } + } + +}