From 472a700b9e6c88433395d0ef19dd3f5a998cb5cb Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Tue, 9 Jun 2026 17:52:46 +0800 Subject: [PATCH 1/4] [core] Support arbitrary time granularity for chain table delta computation --- .../utils/ChainPartitionStepExtractor.java | 315 ++++++++++++++++++ .../apache/paimon/utils/ChainTableUtils.java | 61 ++-- .../ChainPartitionStepExtractorTest.java | 75 +++++ .../paimon/utils/ChainTableUtilsTest.java | 80 +++++ .../paimon/spark/SparkChainTableITCase.java | 57 ++++ 5 files changed, 547 insertions(+), 41 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java new file mode 100644 index 000000000000..bc5a007cc69b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.Period; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAmount; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Extracts the time step duration from a timestamp pattern and formatter for chain table + * partitions. + */ +public class ChainPartitionStepExtractor { + + private static final LocalDateTime FINGERPRINT = LocalDateTime.of(2026, 6, 9, 11, 50, 58); + + private static final Pattern VARIABLE = Pattern.compile("\\$[a-zA-Z_]+"); + private final String pattern; + private final String formatter; + + public ChainPartitionStepExtractor(String pattern, String formatter) { + if (pattern == null) { + throw new IllegalArgumentException("pattern cannot be null"); + } + if (formatter == null) { + throw new IllegalArgumentException("formatter cannot be null"); + } + this.pattern = pattern; + this.formatter = formatter; + } + + /** + * Extracts the minimum time step from the given pattern and formatter. + * + * @return the smallest {@link Duration} or {@link Period} step among variable-controlled time + * units + */ + public TemporalAmount extractMinStep() { + List spans = parseFormatter(formatter); + List fragments = splitPattern(pattern); + + boolean hasConstant = fragments.stream().anyMatch(f -> !f.isVariable); + ChronoField field; + if (!hasConstant) { + field = minField(spans); + } else { + List varRanges = matchFragments(fragments, formatter); + field = minFieldInRanges(spans, varRanges); + } + return toTemporalAmount(field); + } + + /** Parses formatter into time spans with their positions. */ + private static List parseFormatter(String formatter) { + String fingerprint = DateTimeFormatter.ofPattern(formatter).format(FINGERPRINT); + + if (fingerprint.length() != formatter.length()) { + throw new IllegalArgumentException( + "Formatter with escapes or variable length not supported: " + formatter); + } + + List spans = new ArrayList<>(); + int i = 0; + while (i < formatter.length()) { + char c = formatter.charAt(i); + if (isTimeChar(c)) { + int start = i; + while (i < formatter.length() && formatter.charAt(i) == c) { + i++; + } + spans.add(new TimeSpan(resolveField(fingerprint.substring(start, i)), start, i)); + } else { + i++; + } + } + if (spans.isEmpty()) { + throw new IllegalArgumentException("No time unit found in formatter: " + formatter); + } + return spans; + } + + private static boolean isTimeChar(char c) { + return "yMdHhmsS".indexOf(c) >= 0; + } + + private static ChronoField resolveField(String value) { + long v = Long.parseLong(value); + if (v == FINGERPRINT.getSecond()) return ChronoField.SECOND_OF_MINUTE; + if (v == FINGERPRINT.getMinute()) return ChronoField.MINUTE_OF_HOUR; + if (v == FINGERPRINT.getHour()) return ChronoField.HOUR_OF_DAY; + if (v == FINGERPRINT.getDayOfMonth()) return ChronoField.DAY_OF_MONTH; + if (v == FINGERPRINT.getMonthValue()) return ChronoField.MONTH_OF_YEAR; + if (v == FINGERPRINT.getYear() || v == FINGERPRINT.getYear() % 100) { + return ChronoField.YEAR; + } + throw new IllegalArgumentException("Unknown time unit value: " + value); + } + + private static List splitPattern(String pattern) { + List fragments = new ArrayList<>(); + Matcher m = VARIABLE.matcher(pattern); + int last = 0; + + while (m.find()) { + if (m.start() > last) { + fragments.add(new Fragment(pattern, last, m.start(), false)); + } + last = m.end(); + fragments.add(new Fragment(pattern, m.start(), last, true)); + } + if (last < pattern.length()) { + fragments.add(new Fragment(pattern, last, pattern.length(), false)); + } + return fragments; + } + + /** Matches constants to formatter positions, then derives variable ranges from gaps. */ + private static List matchFragments(List fragments, String formatter) { + // Step 1: Match all constants sequentially to ensure order and no overlap + List constantRanges = matchAllConstants(fragments, formatter); + + // Step 2: Generate variable ranges from gaps between constants + int totalLength = formatter.length(); + List variableRanges = new ArrayList<>(); + int previousEnd = 0; + + for (int[] constant : constantRanges) { + int constantStart = constant[0]; + if (constantStart > previousEnd) { + variableRanges.add(new int[] {previousEnd, constantStart}); + } + previousEnd = constant[1]; + } + + // Add the range after the last constant + if (previousEnd < totalLength) { + variableRanges.add(new int[] {previousEnd, totalLength}); + } + + return variableRanges; + } + + /** + * Matches all constant fragments in order, ensuring each constant matches after the previous + * one. + */ + private static List matchAllConstants(List fragments, String formatter) { + List constantRanges = new ArrayList<>(); + int lastMatchedEnd = 0; + int formatterLen = formatter.length(); + + for (int i = 0; i < fragments.size(); i++) { + Fragment fragment = fragments.get(i); + if (!fragment.isVariable) { + String constant = fragment.text; + int constLen = constant.length(); + + if (constLen > formatterLen - lastMatchedEnd) { + throw new IllegalArgumentException( + String.format( + "Constant '%s' exceeds remaining formatter length", constant)); + } + + int pos = findConstantPos(constant, formatter, i, fragments.size(), lastMatchedEnd); + if (pos == -1) { + throw new IllegalArgumentException( + String.format( + "Constant '%s' not found after position %d in formatter", + constant, lastMatchedEnd)); + } + + constantRanges.add(new int[] {pos, pos + constLen}); + lastMatchedEnd = pos + constLen; + } + } + + return constantRanges; + } + + private static int findConstantPos( + String constant, String formatter, int fragIndex, int fragCount, int startFrom) { + int constLen = constant.length(); + int maxStart = formatter.length() - constLen; + if (fragIndex == 0) { + return 0; + } else if (fragIndex == fragCount - 1) { + return maxStart; + } else { + for (int s = startFrom; s <= maxStart; s++) { + if (matchConstant(constant, formatter, s)) { + return s; + } + } + return -1; + } + } + + /** + * Checks if constant matches formatter at the given position. A digit in the constant must + * correspond to a time unit character in the formatter, while non-digit characters must match + * exactly. + */ + private static boolean matchConstant(String constant, String formatter, int start) { + for (int i = 0; i < constant.length(); i++) { + char c = constant.charAt(i); + char f = formatter.charAt(start + i); + if (Character.isDigit(c)) { + if (!isTimeChar(f)) return false; + } else if (c != f) { + return false; + } + } + return true; + } + + private static ChronoField minField(List spans) { + ChronoField min = spans.get(0).field; + for (int i = 1; i < spans.size(); i++) { + if (spans.get(i).field.ordinal() < min.ordinal()) { + min = spans.get(i).field; + } + } + return min; + } + + private static ChronoField minFieldInRanges(List spans, List ranges) { + ChronoField min = null; + int start = 0; + for (int[] range : ranges) { + for (int i = start; i < spans.size(); i++) { + TimeSpan span = spans.get(i); + if (span.start >= range[0] && span.end <= range[1]) { + if (min == null || span.field.ordinal() < min.ordinal()) { + min = span.field; + start = i + 1; + } + } + } + } + if (min == null) { + throw new IllegalArgumentException("No time unit found in variable ranges"); + } + return min; + } + + private static TemporalAmount toTemporalAmount(ChronoField field) { + switch (field) { + case SECOND_OF_MINUTE: + return Duration.ofSeconds(1); + case MINUTE_OF_HOUR: + return Duration.ofMinutes(1); + case HOUR_OF_DAY: + return Duration.ofHours(1); + case DAY_OF_MONTH: + return Duration.ofDays(1); + case MONTH_OF_YEAR: + return Period.ofMonths(1); + case YEAR: + return Period.ofYears(1); + default: + throw new IllegalArgumentException("Unsupported field: " + field); + } + } + + /** Represents a time unit span within the formatter string. */ + private static class TimeSpan { + final ChronoField field; + final int start; + final int end; + + TimeSpan(ChronoField field, int start, int end) { + this.field = field; + this.start = start; + this.end = end; + } + } + + /** Represents a fragment of the pattern (either variable or constant). */ + private static class Fragment { + final String text; + final boolean isVariable; + final int start; + final int end; + + Fragment(String pattern, int start, int end, boolean isVariable) { + this.start = start; + this.end = end; + this.text = pattern.substring(start, end); + this.isVariable = isVariable; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index e6d4e4aadf3c..3916d4fc4ce1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -32,6 +32,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAmount; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -75,11 +76,9 @@ public static List getDeltaPartitions( List partitionColumns, RowType partType, CoreOptions options, - RecordComparator partitionComparator, InternalRowPartitionComputer partitionComputer) { InternalRowSerializer serializer = new InternalRowSerializer(partType); List deltaPartitions = new ArrayList<>(); - boolean isDailyPartition = partitionColumns.size() == 1; List startPartitionValues = new ArrayList<>(partitionComputer.generatePartValues(beginPartition).values()); List endPartitionValues = @@ -89,47 +88,28 @@ public static List getDeltaPartitions( options.partitionTimestampPattern(), options.partitionTimestampFormatter()); LocalDateTime stratPartitionTime = timeExtractor.extract(partitionColumns, startPartitionValues); - LocalDateTime candidateTime = stratPartitionTime; LocalDateTime endPartitionTime = timeExtractor.extract(partitionColumns, endPartitionValues); + ChainPartitionStepExtractor stepExtractor = + new ChainPartitionStepExtractor( + options.partitionTimestampPattern(), options.partitionTimestampFormatter()); + TemporalAmount step = stepExtractor.extractMinStep(); + LocalDateTime candidateTime = stratPartitionTime.plus(step); while (!candidateTime.isAfter(endPartitionTime)) { - if (isDailyPartition) { - if (candidateTime.isAfter(stratPartitionTime)) { - deltaPartitions.add( - serializer - .toBinaryRow( - InternalRowPartitionComputer.convertSpecToInternalRow( - calPartValues( - candidateTime, - partitionColumns, - options.partitionTimestampPattern(), - options.partitionTimestampFormatter()), - partType, - options.partitionDefaultName())) - .copy()); - } - } else { - for (int hour = 0; hour <= 23; hour++) { - candidateTime = candidateTime.toLocalDate().atStartOfDay().plusHours(hour); - BinaryRow candidatePartition = - serializer - .toBinaryRow( - InternalRowPartitionComputer.convertSpecToInternalRow( - calPartValues( - candidateTime, - partitionColumns, - options.partitionTimestampPattern(), - options.partitionTimestampFormatter()), - partType, - options.partitionDefaultName())) - .copy(); - if (partitionComparator.compare(candidatePartition, beginPartition) > 0 - && partitionComparator.compare(candidatePartition, endPartition) <= 0) { - deltaPartitions.add(candidatePartition); - } - } - } - candidateTime = candidateTime.toLocalDate().plusDays(1).atStartOfDay(); + BinaryRow candidatePartition = + serializer + .toBinaryRow( + InternalRowPartitionComputer.convertSpecToInternalRow( + calPartValues( + candidateTime, + partitionColumns, + options.partitionTimestampPattern(), + options.partitionTimestampFormatter()), + partType, + options.partitionDefaultName())) + .copy(); + deltaPartitions.add(candidatePartition); + candidateTime = candidateTime.plus(step); } return deltaPartitions; } @@ -312,7 +292,6 @@ public static List getDeltaPartitionsWithProjector( chainPartitionColumns, chainPartType, options, - chainPartitionComparator, chainPartitionComputer); // Combine each chain-only BinaryRow with the group part into a full partition diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java new file mode 100644 index 000000000000..f128e2d6a53e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.Period; +import java.time.temporal.TemporalAmount; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ChainPartitionStepExtractor}. */ +public class ChainPartitionStepExtractorTest { + private TemporalAmount extractMinStep(@Nullable String pattern, @Nullable String formatter) { + return new ChainPartitionStepExtractor(pattern, formatter).extractMinStep(); + } + + @Test + public void testVariablePatternsWithDuration() { + assertThat(extractMinStep("$ab$c $d:$e:$f", "yyyyMMdd HH:mm:ss")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("$day $a:$b", "yyyyMMdd HH:mm")).isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("$day$a", "yyyy/MM/ddHH")).isEqualTo(Duration.ofHours(1)); + + assertThat(extractMinStep("$a $b", "HH:mm:ss yyyyMMdd")).isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12:$a $b", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12:$a:01 $b", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("12:02:01 $b", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$hour:00:00 $date", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofHours(1)); + assertThat(extractMinStep("00:00:00 $b", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$hour_minute:01 $date", "HH:mm:ss yyyyMMdd")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("12:$a $b", "HH:mm:ss yyMMdd")).isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12$b", "HHmmss")).isEqualTo(Duration.ofSeconds(1)); + } + + @Test + public void testVariablePatternsWithPeriod() { + assertThat(extractMinStep("$a$b", "yyyyMMdd")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a01", "yyyyMMdd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("202601$a", "yyyyMMdd")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("2026$a01", "yyyyMMdd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a1201", "yyyyMMdd")).isEqualTo(Period.ofYears(1)); + assertThat(extractMinStep("$year_month01", "yyyyMMdd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$year1201", "yyyyMMdd")).isEqualTo(Period.ofYears(1)); + + assertThat(extractMinStep("$a01", "yyMMdd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a1201", "yyMMdd")).isEqualTo(Period.ofYears(1)); + assertThat(extractMinStep("$year_month$day", "yyMMdd")).isEqualTo(Duration.ofDays(1)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java index 971407b92815..95c488de7f61 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java @@ -767,4 +767,84 @@ public void testFindFirstLatestPartitionsWithMultiChainKeys() { assertThat(getString(matched2, 0)).isEqualTo("20250809"); assertThat(getString(matched2, 1)).isEqualTo("02"); } + + @Test + public void testGetDeltaPartitionsWithHourMinuteGranularity() { + // partition keys: (region, dt, hour_minute), chain keys: (dt, hour_minute) + RowType fullType = + RowType.builder() + .field("region", DataTypes.STRING().notNull()) + .field("dt", DataTypes.STRING().notNull()) + .field("hour_minute", DataTypes.STRING().notNull()) + .build(); + + ChainPartitionProjector projector = new ChainPartitionProjector(fullType, 2); + + // Compare chain partition (dt, hour_minute) lexicographically + RecordComparator chainComparator = (a, b) -> a.getString(1).compareTo(b.getString(1)); + + Options opts = new Options(); + opts.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$dt $hour_minute:00"); + opts.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd HH:mm:ss"); + CoreOptions options = new CoreOptions(opts); + + BinaryRow begin = row(Lists.newArrayList("CN", "20260609", "10:10")); + BinaryRow end = row(Lists.newArrayList("CN", "20260609", "10:15")); + + List deltas = + ChainTableUtils.getDeltaPartitionsWithProjector( + begin, end, options, chainComparator, projector); + + assertThat(deltas).hasSize(5); + for (BinaryRow delta : deltas) { + assertThat(getString(delta, 0)).isEqualTo("CN"); + assertThat(getString(delta, 1)).isEqualTo("20260609"); + } + assertThat(getString(deltas.get(0), 2)).isEqualTo("10:11"); + assertThat(getString(deltas.get(1), 2)).isEqualTo("10:12"); + assertThat(getString(deltas.get(2), 2)).isEqualTo("10:13"); + assertThat(getString(deltas.get(3), 2)).isEqualTo("10:14"); + assertThat(getString(deltas.get(4), 2)).isEqualTo("10:15"); + } + + @Test + public void testGetDeltaPartitionsWithSeparateHourAndMinute() { + // partition keys: (region, dt, hour, minute), chain keys: (dt, hour, minute) + RowType fullType = + RowType.builder() + .field("region", DataTypes.STRING().notNull()) + .field("dt", DataTypes.STRING().notNull()) + .field("hour", DataTypes.STRING().notNull()) + .field("minute", DataTypes.STRING().notNull()) + .build(); + + ChainPartitionProjector projector = new ChainPartitionProjector(fullType, 3); + + // Compare chain partition (dt, hour, minute) lexicographically + RecordComparator chainComparator = (a, b) -> a.getString(2).compareTo(b.getString(2)); + + Options opts = new Options(); + opts.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$dt $hour:$minute:00"); + opts.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd HH:mm:ss"); + CoreOptions options = new CoreOptions(opts); + + BinaryRow begin = row(Lists.newArrayList("CN", "20260609", "10", "10")); + BinaryRow end = row(Lists.newArrayList("CN", "20260609", "10", "15")); + + List deltas = + ChainTableUtils.getDeltaPartitionsWithProjector( + begin, end, options, chainComparator, projector); + + assertThat(deltas).hasSize(5); + for (BinaryRow delta : deltas) { + assertThat(getString(delta, 0)).isEqualTo("CN"); + assertThat(getString(delta, 1)).isEqualTo("20260609"); + assertThat(getString(delta, 2)).isEqualTo("10"); + } + assertThat(getString(deltas.get(0), 3)).isEqualTo("11"); + assertThat(getString(deltas.get(1), 3)).isEqualTo("12"); + assertThat(getString(deltas.get(2), 3)).isEqualTo("13"); + assertThat(getString(deltas.get(3), 3)).isEqualTo("14"); + assertThat(getString(deltas.get(4), 3)).isEqualTo("15"); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index af457ac63fe5..26d7735525f5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -2420,4 +2420,61 @@ public void testChainTableWithMultiChainKeys(@TempDir java.nio.file.Path tempDir spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); spark.close(); } + + @Test + public void testChainTableWithMinuteLevelPartitions(@TempDir java.nio.file.Path tempDir) + throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + spark.sql( + "CREATE TABLE `chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + ") PARTITIONED BY (`dt` STRING, `hr_min` STRING)\n" + + "TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,hr_min,t1',\n" + + " 'partition.timestamp-pattern' = '$dt $hr_min:00',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '1',\n" + + " 'merge-engine' = 'deduplicate',\n" + + " 'sequence.field' = 't2',\n" + + " 'chain-table.chain-partition-keys' = 'dt,hr_min'\n" + + ");"); + + setupChainTableBranches(spark, "chain_test"); + + spark.sql( + "INSERT INTO TABLE `chain_test$branch_snapshot` PARTITION (dt = '20250810', hr_min='01:01') VALUES (3, 1, '3');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_snapshot` PARTITION (dt = '20250810', hr_min='03:30') VALUES (4, 1, '4');"); + + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:35') VALUES (5, 1, '5');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:40') VALUES (6, 1, '6');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:45') VALUES (7, 1, '7');"); + + assertThat( + spark + .sql( + "select * from `chain_test` where dt='20250810' and hr_min='03:40'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[4,1,4,20250810,03:40]", + "[5,1,5,20250810,03:40]", + "[6,1,6,20250810,03:40]"); + + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.close(); + } } From 4591d68d8be945b78b26a80803329816b4f021c6 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 10 Jun 2026 09:00:19 +0800 Subject: [PATCH 2/4] [core] Optimize checkArgument --- .../utils/ChainPartitionStepExtractor.java | 52 ++++++++----------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java index bc5a007cc69b..d3c02f3c9ff8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java @@ -29,6 +29,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * Extracts the time step duration from a timestamp pattern and formatter for chain table * partitions. @@ -42,12 +44,8 @@ public class ChainPartitionStepExtractor { private final String formatter; public ChainPartitionStepExtractor(String pattern, String formatter) { - if (pattern == null) { - throw new IllegalArgumentException("pattern cannot be null"); - } - if (formatter == null) { - throw new IllegalArgumentException("formatter cannot be null"); - } + checkArgument(pattern != null, "pattern cannot be null"); + checkArgument(formatter != null, "formatter cannot be null"); this.pattern = pattern; this.formatter = formatter; } @@ -77,10 +75,10 @@ public TemporalAmount extractMinStep() { private static List parseFormatter(String formatter) { String fingerprint = DateTimeFormatter.ofPattern(formatter).format(FINGERPRINT); - if (fingerprint.length() != formatter.length()) { - throw new IllegalArgumentException( - "Formatter with escapes or variable length not supported: " + formatter); - } + checkArgument( + fingerprint.length() == formatter.length(), + "Formatter with escapes or variable length not supported: %s", + formatter); List spans = new ArrayList<>(); int i = 0; @@ -96,9 +94,7 @@ private static List parseFormatter(String formatter) { i++; } } - if (spans.isEmpty()) { - throw new IllegalArgumentException("No time unit found in formatter: " + formatter); - } + checkArgument(!spans.isEmpty(), "No time unit found in formatter: %s", formatter); return spans; } @@ -116,7 +112,7 @@ private static ChronoField resolveField(String value) { if (v == FINGERPRINT.getYear() || v == FINGERPRINT.getYear() % 100) { return ChronoField.YEAR; } - throw new IllegalArgumentException("Unknown time unit value: " + value); + throw new IllegalStateException("Unknown time unit value: " + value); } private static List splitPattern(String pattern) { @@ -178,19 +174,17 @@ private static List matchAllConstants(List fragments, String fo String constant = fragment.text; int constLen = constant.length(); - if (constLen > formatterLen - lastMatchedEnd) { - throw new IllegalArgumentException( - String.format( - "Constant '%s' exceeds remaining formatter length", constant)); - } + checkArgument( + constLen <= formatterLen - lastMatchedEnd, + "Constant '%s' exceeds remaining formatter length", + constant); int pos = findConstantPos(constant, formatter, i, fragments.size(), lastMatchedEnd); - if (pos == -1) { - throw new IllegalArgumentException( - String.format( - "Constant '%s' not found after position %d in formatter", - constant, lastMatchedEnd)); - } + checkArgument( + pos >= 0, + "Constant '%s' not found after position %d in formatter", + constant, + lastMatchedEnd); constantRanges.add(new int[] {pos, pos + constLen}); lastMatchedEnd = pos + constLen; @@ -260,9 +254,7 @@ private static ChronoField minFieldInRanges(List spans, List ra } } } - if (min == null) { - throw new IllegalArgumentException("No time unit found in variable ranges"); - } + checkArgument(min != null, "No time unit found in variable ranges"); return min; } @@ -281,11 +273,10 @@ private static TemporalAmount toTemporalAmount(ChronoField field) { case YEAR: return Period.ofYears(1); default: - throw new IllegalArgumentException("Unsupported field: " + field); + throw new IllegalStateException("Unsupported field: " + field); } } - /** Represents a time unit span within the formatter string. */ private static class TimeSpan { final ChronoField field; final int start; @@ -298,7 +289,6 @@ private static class TimeSpan { } } - /** Represents a fragment of the pattern (either variable or constant). */ private static class Fragment { final String text; final boolean isVariable; From 9130b2eeec7e095c1dc1352db1e23df0fcc3fa46 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 10 Jun 2026 09:21:23 +0800 Subject: [PATCH 3/4] [core] Fix style --- .../utils/ChainPartitionStepExtractor.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java index d3c02f3c9ff8..929c45e8cf06 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java @@ -38,7 +38,7 @@ public class ChainPartitionStepExtractor { private static final LocalDateTime FINGERPRINT = LocalDateTime.of(2026, 6, 9, 11, 50, 58); - + private static final String TIME_UNIT_CHARS = "yMdHhmsS"; private static final Pattern VARIABLE = Pattern.compile("\\$[a-zA-Z_]+"); private final String pattern; private final String formatter; @@ -99,16 +99,26 @@ private static List parseFormatter(String formatter) { } private static boolean isTimeChar(char c) { - return "yMdHhmsS".indexOf(c) >= 0; + return TIME_UNIT_CHARS.indexOf(c) >= 0; } private static ChronoField resolveField(String value) { long v = Long.parseLong(value); - if (v == FINGERPRINT.getSecond()) return ChronoField.SECOND_OF_MINUTE; - if (v == FINGERPRINT.getMinute()) return ChronoField.MINUTE_OF_HOUR; - if (v == FINGERPRINT.getHour()) return ChronoField.HOUR_OF_DAY; - if (v == FINGERPRINT.getDayOfMonth()) return ChronoField.DAY_OF_MONTH; - if (v == FINGERPRINT.getMonthValue()) return ChronoField.MONTH_OF_YEAR; + if (v == FINGERPRINT.getSecond()) { + return ChronoField.SECOND_OF_MINUTE; + } + if (v == FINGERPRINT.getMinute()) { + return ChronoField.MINUTE_OF_HOUR; + } + if (v == FINGERPRINT.getHour()) { + return ChronoField.HOUR_OF_DAY; + } + if (v == FINGERPRINT.getDayOfMonth()) { + return ChronoField.DAY_OF_MONTH; + } + if (v == FINGERPRINT.getMonthValue()) { + return ChronoField.MONTH_OF_YEAR; + } if (v == FINGERPRINT.getYear() || v == FINGERPRINT.getYear() % 100) { return ChronoField.YEAR; } @@ -222,7 +232,9 @@ private static boolean matchConstant(String constant, String formatter, int star char c = constant.charAt(i); char f = formatter.charAt(start + i); if (Character.isDigit(c)) { - if (!isTimeChar(f)) return false; + if (!isTimeChar(f)) { + return false; + } } else if (c != f) { return false; } From b11e9c21f83b8bc49862f65c7186c0befd089c0c Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 10 Jun 2026 18:17:03 +0800 Subject: [PATCH 4/4] [core] Fix code style --- .../utils/ChainPartitionStepExtractor.java | 52 ++++++------------- .../ChainPartitionStepExtractorTest.java | 6 ++- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java index 929c45e8cf06..7c5f165d069b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainPartitionStepExtractor.java @@ -59,16 +59,9 @@ public ChainPartitionStepExtractor(String pattern, String formatter) { public TemporalAmount extractMinStep() { List spans = parseFormatter(formatter); List fragments = splitPattern(pattern); - - boolean hasConstant = fragments.stream().anyMatch(f -> !f.isVariable); - ChronoField field; - if (!hasConstant) { - field = minField(spans); - } else { - List varRanges = matchFragments(fragments, formatter); - field = minFieldInRanges(spans, varRanges); - } - return toTemporalAmount(field); + List varRanges = matchFragments(fragments, formatter); + ChronoField field = minFieldInRanges(spans, varRanges); + return stepOf(field); } /** Parses formatter into time spans with their positions. */ @@ -132,13 +125,13 @@ private static List splitPattern(String pattern) { while (m.find()) { if (m.start() > last) { - fragments.add(new Fragment(pattern, last, m.start(), false)); + fragments.add(new Fragment(pattern.substring(last, m.start()), false)); } + fragments.add(new Fragment(m.group(), true)); last = m.end(); - fragments.add(new Fragment(pattern, m.start(), last, true)); } if (last < pattern.length()) { - fragments.add(new Fragment(pattern, last, pattern.length(), false)); + fragments.add(new Fragment(pattern.substring(last), false)); } return fragments; } @@ -208,18 +201,21 @@ private static int findConstantPos( String constant, String formatter, int fragIndex, int fragCount, int startFrom) { int constLen = constant.length(); int maxStart = formatter.length() - constLen; - if (fragIndex == 0) { - return 0; - } else if (fragIndex == fragCount - 1) { - return maxStart; + if (fragIndex == fragCount - 1) { + // Last fragment: match from the end to ensure it occupies the trailing positions + for (int s = maxStart; s >= startFrom; s--) { + if (matchConstant(constant, formatter, s)) { + return s; + } + } } else { for (int s = startFrom; s <= maxStart; s++) { if (matchConstant(constant, formatter, s)) { return s; } } - return -1; } + return -1; } /** @@ -242,16 +238,6 @@ private static boolean matchConstant(String constant, String formatter, int star return true; } - private static ChronoField minField(List spans) { - ChronoField min = spans.get(0).field; - for (int i = 1; i < spans.size(); i++) { - if (spans.get(i).field.ordinal() < min.ordinal()) { - min = spans.get(i).field; - } - } - return min; - } - private static ChronoField minFieldInRanges(List spans, List ranges) { ChronoField min = null; int start = 0; @@ -270,7 +256,7 @@ private static ChronoField minFieldInRanges(List spans, List ra return min; } - private static TemporalAmount toTemporalAmount(ChronoField field) { + private static TemporalAmount stepOf(ChronoField field) { switch (field) { case SECOND_OF_MINUTE: return Duration.ofSeconds(1); @@ -304,13 +290,9 @@ private static class TimeSpan { private static class Fragment { final String text; final boolean isVariable; - final int start; - final int end; - Fragment(String pattern, int start, int end, boolean isVariable) { - this.start = start; - this.end = end; - this.text = pattern.substring(start, end); + Fragment(String text, boolean isVariable) { + this.text = text; this.isVariable = isVariable; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java index f128e2d6a53e..d941fe9d2763 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainPartitionStepExtractorTest.java @@ -60,8 +60,12 @@ public void testVariablePatternsWithDuration() { @Test public void testVariablePatternsWithPeriod() { - assertThat(extractMinStep("$a$b", "yyyyMMdd")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a-01-$b", "yyyy-MM-dd")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a-01", "yyyy-MM-dd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$y/$m/$day", "yyyy/MM/dd")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a01", "yyyyMMdd")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a$b", "yyyyMMdd")).isEqualTo(Duration.ofDays(1)); assertThat(extractMinStep("202601$a", "yyyyMMdd")).isEqualTo(Duration.ofDays(1)); assertThat(extractMinStep("2026$a01", "yyyyMMdd")).isEqualTo(Period.ofMonths(1)); assertThat(extractMinStep("$a1201", "yyyyMMdd")).isEqualTo(Period.ofYears(1));