diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 35949062e307..a0a1a37a2076 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -73,6 +73,7 @@ import java.io.FileNotFoundException; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -476,7 +477,8 @@ public TableCommitImpl newCommit(String commitUser) { options.snapshotExpireExecutionMode(), name(), options.forceCreatingSnapshot(), - options.fileOperationThreadNum()); + options.fileOperationThreadNum(), + createPartitionValidators(this)); } @Override @@ -744,6 +746,14 @@ protected RowKindGenerator rowKindGenerator() { return RowKindGenerator.create(schema(), store().options()); } + private List createPartitionValidators(FileStoreTable table) { + List validators = new ArrayList<>(); + if (table.coreOptions().isChainTable()) { + validators.add(new ChainPartitionDropValidator(table)); + } + return validators; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainPartitionDropValidator.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainPartitionDropValidator.java new file mode 100644 index 000000000000..84e608c31143 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainPartitionDropValidator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; + +/** + * Validator for chain table partition drop. + * + *

Checks if a partition can be dropped based on the chain table's snapshot partition + * dependencies. + */ +public class ChainPartitionDropValidator implements PartitionValidator { + + private static final Logger LOG = LoggerFactory.getLogger(ChainPartitionDropValidator.class); + + private final FileStoreTable table; + private final CoreOptions coreOptions; + + public ChainPartitionDropValidator(FileStoreTable table) { + this.table = table; + this.coreOptions = table.coreOptions(); + } + + @Override + public void validatePartitionDrop(List> partitionSpecs) { + if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) { + return; + } + FileStoreTable candidateTable = table; + if (table instanceof FallbackReadFileStoreTable) { + candidateTable = + ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback()) + .wrapped(); + } + FileStoreTable deltaTable = + candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch()); + List partitions = + createBinaryPartitions( + partitionSpecs, + table.schema().logicalPartitionType(), + coreOptions.partitionDefaultName()); + RowDataToObjectArrayConverter partitionConverter = + new RowDataToObjectArrayConverter(table.schema().logicalPartitionType()); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + coreOptions.partitionDefaultName(), + table.schema().logicalPartitionType(), + table.schema().partitionKeys().toArray(new String[0]), + coreOptions.legacyPartitionName()); + RecordComparator partitionComparator = + CodeGenUtils.newRecordComparator( + table.schema().logicalPartitionType().getFieldTypes()); + List snapshotPartitions = + table.newSnapshotReader().partitionEntries().stream() + .map(PartitionEntry::partition) + .sorted(partitionComparator) + .collect(Collectors.toList()); + SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader(); + PredicateBuilder builder = new PredicateBuilder(table.schema().logicalPartitionType()); + for (BinaryRow partition : partitions) { + Optional preSnapshotPartition = + findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator); + Optional nextSnapshotPartition = + findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator); + Predicate deltaFollowingPredicate = + ChainTableUtils.createTriangularPredicate( + partition, partitionConverter, builder::equal, builder::greaterThan); + List deltaFollowingPartitions = + deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate) + .partitionEntries().stream() + .map(PartitionEntry::partition) + .filter( + deltaPartition -> + isNextIntervalPartition( + deltaPartition, + nextSnapshotPartition, + partitionComparator)) + .collect(Collectors.toList()); + boolean canDrop = + deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent(); + LOG.info( + "Drop partition, partition={}, canDrop={}, preSnapshotPartition={}, nextSnapshotPartition={}", + partitionComputer.generatePartValues(partition), + canDrop, + generatePartitionValues(preSnapshotPartition, partitionComputer), + generatePartitionValues(nextSnapshotPartition, partitionComputer)); + if (!canDrop) { + throw new RuntimeException("Snapshot partition cannot be dropped."); + } + } + } + + private Optional findPreSnapshotPartition( + List snapshotPartitions, + BinaryRow partition, + RecordComparator partitionComparator) { + BinaryRow pre = null; + for (BinaryRow snapshotPartition : snapshotPartitions) { + if (partitionComparator.compare(snapshotPartition, partition) < 0) { + pre = snapshotPartition; + } else { + break; + } + } + return Optional.ofNullable(pre); + } + + private Optional findNextSnapshotPartition( + List snapshotPartitions, + BinaryRow partition, + RecordComparator partitionComparator) { + for (BinaryRow snapshotPartition : snapshotPartitions) { + if (partitionComparator.compare(snapshotPartition, partition) > 0) { + return Optional.of(snapshotPartition); + } + } + return Optional.empty(); + } + + private boolean isNextIntervalPartition( + BinaryRow partition, + Optional nextSnapshotPartition, + RecordComparator partitionComparator) { + return !nextSnapshotPartition.isPresent() + || partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0; + } + + private String generatePartitionValues( + Optional partition, InternalRowPartitionComputer partitionComputer) { + if (!partition.isPresent()) { + return ""; + } + return partitionComputer.generatePartValues(partition.get()).toString(); + } + + @Override + public void close() throws Exception {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PartitionValidator.java b/paimon-core/src/main/java/org/apache/paimon/table/PartitionValidator.java new file mode 100644 index 000000000000..cb1307fad9b1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/PartitionValidator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import java.util.List; +import java.util.Map; + +/** Validator to validate partition operations. */ +public interface PartitionValidator extends AutoCloseable { + + void validatePartitionDrop(List> partitionSpecs); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 8bd65b05c336..7ba957570de9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -30,6 +30,7 @@ import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.stats.Statistics; +import org.apache.paimon.table.PartitionValidator; import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.tag.TagTimeExpire; @@ -92,6 +93,8 @@ public class TableCommitImpl implements InnerTableCommit { private boolean batchCommitted = false; private boolean expireForEmptyCommit = true; + private final List partitionValidators; + public TableCommitImpl( FileStoreCommit commit, @Nullable Runnable expireSnapshots, @@ -102,7 +105,8 @@ public TableCommitImpl( ExpireExecutionMode expireExecutionMode, String tableName, boolean forceCreatingSnapshot, - int threadNum) { + int threadNum, + List partitionValidators) { if (partitionExpire != null) { commit.withPartitionExpire(partitionExpire); } @@ -126,6 +130,10 @@ public TableCommitImpl( this.tableName = tableName; this.forceCreatingSnapshot = forceCreatingSnapshot; this.fileCheckExecutor = FileOperationThreadPool.getExecutorService(threadNum); + this.partitionValidators = + partitionValidators == null + ? Collections.emptyList() + : Collections.unmodifiableList(new ArrayList<>(partitionValidators)); } public boolean forceCreatingSnapshot() { @@ -184,6 +192,7 @@ public void truncateTable() { @Override public void truncatePartitions(List> partitionSpecs) { + partitionValidators.forEach(validator -> validator.validatePartitionDrop(partitionSpecs)); commit.dropPartitions(partitionSpecs, COMMIT_IDENTIFIER); } @@ -409,6 +418,13 @@ public void expireSnapshots() { public void close() throws Exception { commit.close(); maintainExecutor.shutdownNow(); + for (PartitionValidator validator : partitionValidators) { + try { + validator.close(); + } catch (Exception e) { + LOG.warn("Failed to close partition validator.", e); + } + } } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index fd4cdff4d689..ba55001dcaef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -209,4 +209,9 @@ public static boolean isScanFallbackDeltaBranch(CoreOptions options) { return options.isChainTable() && options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch()); } + + public static boolean isScanFallbackSnapshotBranch(CoreOptions options) { + return options.isChainTable() + && options.scanFallbackSnapshotBranch().equalsIgnoreCase(options.branch()); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index b5a28fe50010..8f25a3841a26 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -33,6 +33,8 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Base tests for spark read. */ public class SparkChainTableITCase { @@ -53,70 +55,10 @@ public static void closeMetastore() throws Exception { @Test public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); - SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,t1',\n" - + " 'partition.timestamp-pattern' = '$dt',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); + SparkSession.Builder builder = initSparkClient(tempDir); + initDailyChainTable(builder); + SparkSession spark = builder.getOrCreate(); /** Write main branch */ spark.sql( "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 1, '1'),(2, 1, '1');"); @@ -269,80 +211,16 @@ public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOExcepti spark.sql( "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250814'"); assertThat(df.count()).isEqualTo(1); - spark.close(); - spark = builder.getOrCreate(); - /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); - spark.close(); + dropTable(builder); } @Test public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); + SparkSession.Builder builder = initSparkClient(tempDir); + initHourlyChainTable(builder); SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,hour,t1',\n" - + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); /** Write main branch */ spark.sql( @@ -502,8 +380,173 @@ public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOE spark.close(); spark = builder.getOrCreate(); /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.close(); + } + @Test + public void testDropSnapshotPartition(@TempDir java.nio.file.Path tempDir) throws IOException { + SparkSession.Builder builder = initSparkClient(tempDir); + initDailyChainTable(builder); + + SparkSession spark = builder.getOrCreate(); + /** Write delta branch */ + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260101') values (1, 1, '1'),(2, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260102') values (1, 2, '1-1' ),(3, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260103') values (2, 2, '1-1' ),(4, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260104') values (3, 2, '1-1' ),(4, 2, '1-1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260105') values (5, 1, '1' ),(6, 1, '1' );"); + + /** Write snapshot branch */ + spark.sql("set spark.paimon.branch=snapshot;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260101') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260103') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20260105') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); + spark.close(); + + final SparkSession session = builder.getOrCreate(); + assertThatNoException() + .isThrownBy( + () -> { + session.sql( + "alter table `my_db1`.`chain_test$branch_snapshot` drop partition (dt = '20260105');"); + }); + assertThatThrownBy( + () -> { + session.sql( + "alter table `my_db1`.`chain_test$branch_snapshot` drop partition (dt = '20260101');"); + }); + session.close(); + + dropTable(builder); + } + + private SparkSession.Builder initSparkClient(@TempDir java.nio.file.Path tempDir) { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + return builder; + } + + private void initDailyChainTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,t1',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + + spark.close(); + } + + private void initHourlyChainTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,hour,t1',\n" + + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.close(); + } + + private void dropTable(SparkSession.Builder builder) throws IOException { + SparkSession spark = builder.getOrCreate(); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); spark.close(); } }