From 50d1e685a4b2d0877919ee887e0304d46891089d Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 24 Mar 2026 18:10:51 -0700 Subject: [PATCH 1/6] [SPARK-56190][SQL] Support nested partition columns for DSV2 PartitionPredicate --- ...ence.java => PartitionFieldReference.java} | 6 +- .../filter/PartitionPredicate.java | 14 +- .../read/SupportsPushDownV2Filters.java | 6 +- ...cala => PartitionFieldReferenceImpl.scala} | 10 +- .../connector/PartitionPredicateField.scala | 32 ++++ .../connector/PartitionPredicateImpl.scala | 46 ++++-- ...InMemoryEnhancedPartitionFilterTable.scala | 11 +- .../PartitionPredicateImplSuite.scala | 57 ++++++- ...upBasedRowLevelOperationScanPlanning.scala | 12 +- .../datasources/v2/PushDownUtils.scala | 156 ++++++++++++++---- .../v2/V2ScanRelationPushDown.scala | 4 +- ...SourceV2EnhancedPartitionFilterSuite.scala | 27 ++- 12 files changed, 292 insertions(+), 89 deletions(-) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{PartitionColumnReference.java => PartitionFieldReference.java} (84%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/{PartitionColumnReferenceImpl.scala => PartitionFieldReferenceImpl.scala} (73%) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PartitionPredicateField.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionColumnReference.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionFieldReference.java similarity index 84% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionColumnReference.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionFieldReference.java index ef516545812b9..43158b2f840a9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionColumnReference.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionFieldReference.java @@ -21,16 +21,16 @@ import org.apache.spark.sql.connector.catalog.Table; /** - * A reference to a partition column in {@link Table#partitioning()}. + * A reference to a partition field in {@link Table#partitioning()}. *

- * {@link #fieldNames()} returns the partition column name (or names) as reported by + * {@link #fieldNames()} returns the partition field name (or names) as reported by * the table's partition schema. * {@link #ordinal()} returns the 0-based position in {@link Table#partitioning()}. * * @since 4.2.0 */ @Evolving -public interface PartitionColumnReference extends NamedReference { +public interface PartitionFieldReference extends NamedReference { /** * Returns the 0-based ordinal of this partition column in {@link Table#partitioning()}. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/PartitionPredicate.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/PartitionPredicate.java index dbc31aa1a4583..940965a89c068 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/PartitionPredicate.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/PartitionPredicate.java @@ -21,9 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.expressions.NamedReference; -import org.apache.spark.sql.connector.expressions.PartitionColumnReference; - -import static org.apache.spark.sql.connector.expressions.Expression.EMPTY_EXPRESSION; +import org.apache.spark.sql.connector.expressions.PartitionFieldReference; /** * Represents a partition predicate that can be evaluated using {@link Table#partitioning()}. @@ -47,17 +45,17 @@ protected PartitionPredicate() { /** * {@inheritDoc} *

- * For PartitionPredicate, returns {@link PartitionColumnReference} instances that identify + * For PartitionPredicate, returns {@link PartitionFieldReference} instances that identify * the partition columns (from {@link Table#partitioning()}) referenced by this predicate. - * Each reference's {@link PartitionColumnReference#fieldNames()} gives the partition column - * name; {@link PartitionColumnReference#ordinal()} gives the 0-based position in + * Each reference's {@link PartitionFieldReference#fieldNames()} gives the partition column + * name; {@link PartitionFieldReference#ordinal()} gives the 0-based position in * {@link Table#partitioning()}. *

* Example: Suppose {@code Table.partitioning()} returns three partition * transforms: {@code [years(ts), months(ts), bucket(32, id)]} with ordinals 0, 1, 2. - * Each {@link PartitionColumnReference} has {@link PartitionColumnReference#fieldNames()} + * Each {@link PartitionFieldReference} has {@link PartitionFieldReference#fieldNames()} * (the transform display name, e.g. {@code years(ts)}) and - * {@link PartitionColumnReference#ordinal()}: + * {@link PartitionFieldReference#ordinal()}: *