[SPARK-56190][SQL] Support nested partition columns for DSV2 PartitionPredicate#54995
[SPARK-56190][SQL] Support nested partition columns for DSV2 PartitionPredicate#54995szehon-ho wants to merge 4 commits intoapache:masterfrom
Conversation
302f3b8 to
e23797a
Compare
e23797a to
50d1e68
Compare
|
@cloud-fan @peter-toth could you help take a look ? Thanks |
...yst/src/test/scala/org/apache/spark/sql/internal/connector/PartitionPredicateImplSuite.scala
Outdated
Show resolved
Hide resolved
...yst/src/test/scala/org/apache/spark/sql/internal/connector/PartitionPredicateImplSuite.scala
Outdated
Show resolved
Hide resolved
| expr: Expression, | ||
| partitionFields: Seq[PartitionPredicateField], | ||
| resolver: (String, String) => Boolean): Expression = { | ||
| val partitionAttrs = toAttributes(StructType(partitionFields.map(_.structField))) |
There was a problem hiding this comment.
We call normalizePartitionRefs in a loop so it would make sense to precomute partitionAttrs and pass it into normalizePartitionRefs().
| @@ -178,18 +190,100 @@ object PushDownUtils { | |||
| */ | |||
| private def toSupportedPartitionField( | |||
| transform: Transform, | |||
| relation: DataSourceV2Relation): Option[StructField] = { | |||
| relationOutput: Seq[AttributeReference]): Option[StructField] = { | |||
| transform match { | |||
| case t: IdentityTransform if t.ref.fieldNames.length == 1 => | |||
| val colName = t.ref.fieldNames.head | |||
| relation.output | |||
| .find(_.name == colName) | |||
| .map(attr => StructField(colName, attr.dataType, attr.nullable)) | |||
| case t: IdentityTransform => | |||
| val names = t.ref.fieldNames.toIndexedSeq | |||
| resolveIdentityPartitionField(names, relationOutput) | |||
| case _ => | |||
| None | |||
| } | |||
| } | |||
|
|
|||
| /** | |||
| * Resolves an identity partition column path to a StructField. | |||
| */ | |||
| private def resolveIdentityPartitionField( | |||
| names: Seq[String], | |||
| relationOutput: Seq[AttributeReference]): Option[StructField] = { | |||
| if (names.isEmpty) { | |||
| None | |||
| } else { | |||
| val resolver = SQLConf.get.resolver | |||
| val rootStruct = | |||
| StructType(relationOutput.map(a => StructField(a.name, a.dataType, a.nullable))) | |||
| rootStruct.findNestedField(names, resolver = resolver).map { | |||
| case (_, leaf) => | |||
| StructField(names.mkString("."), leaf.dataType, leaf.nullable) | |||
| } | |||
| } | |||
| } | |||
There was a problem hiding this comment.
| def getPartitionSchemaInfo( | |
| relation: DataSourceV2Relation): Option[Seq[PartitionPredicateField]] = { | |
| val transforms = relation.table.partitioning | |
| if (transforms.isEmpty) return None | |
| val resolver = SQLConf.get.resolver | |
| val rootStruct = | |
| StructType(relation.output.map(a => StructField(a.name, a.dataType, a.nullable))) | |
| val fields = transforms.flatMap { | |
| case t: IdentityTransform => | |
| toSupportedPartitionField(t, rootStruct, resolver).map(PartitionPredicateField(_, t.ref)) | |
| case _ => None | |
| } | |
| if (fields.length == transforms.length) Some(fields.toIndexedSeq) else None | |
| } | |
| /** | |
| * Returns a StructField for the given identity partition transform if it is | |
| * supported for iterative partition predicate push down. | |
| */ | |
| private def toSupportedPartitionField( | |
| transform: IdentityTransform, | |
| rootStruct: StructType, | |
| resolver: (String, String) => Boolean): Option[StructField] = { | |
| val names = transform.ref.fieldNames().toIndexedSeq | |
| if (names.isEmpty) None | |
| else rootStruct.findNestedField(names, resolver = resolver).map { | |
| case (_, leaf) => StructField(names.mkString("."), leaf.dataType, leaf.nullable) | |
| } | |
| } |
There was a problem hiding this comment.
thanks, done on the updated branch.
...talyst/src/main/java/org/apache/spark/sql/connector/expressions/PartitionFieldReference.java
Outdated
Show resolved
Hide resolved
|
thanks @peter-toth, addressed FYI i found an issue in the first approach. The normalization of the nested partition field filters, ie GetStruct(AttributeRef("parent"), "child)) => AttributeRef("parent.child") messed up the post-scan filter if rejected, so restoring them to original form before returning. |
| case (_, leaf) => StructField(names.mkString("."), leaf.dataType, leaf.nullable) | ||
| } | ||
| } catch { | ||
| case _: AnalysisException => |
There was a problem hiding this comment.
should not happen, but the underlying code does throw exception if something not resolvalabe, in our case we should just skip pushdown as it should not be fatal
| nonPartitionFilters ++ nonPushable ++ rejectedPartitionFilters | ||
| p => p.asInstanceOf[PartitionPredicateImpl].expression | ||
| }.toSeq | ||
| (nonPartitionFilters ++ nonPushable ++ rejectedPartitionFilters).map(normalizedToOriginal) |
There was a problem hiding this comment.
this is where i return the original (non-flattened partition filters)
cloud-fan
left a comment
There was a problem hiding this comment.
Review Summary
This PR extends DSV2 partition predicate pushdown (SPARK-55596) to support nested partition columns (e.g., PARTITIONED BY (s.tz)). The approach normalizes GetStructField chains to flat AttributeReferences before the existing partition filter machinery, then denormalizes results back for post-scan filters. The design is clean — normalization is localized to pushPartitionPredicates without changing shared utilities.
One correctness concern with the denormalization map lookup, and two minor text fixes from the rename.
| nonPartitionFilters ++ nonPushable ++ rejectedPartitionFilters | ||
| p => p.asInstanceOf[PartitionPredicateImpl].expression | ||
| }.toSeq | ||
| (nonPartitionFilters ++ nonPushable ++ rejectedPartitionFilters).map(normalizedToOriginal) |
There was a problem hiding this comment.
.map(normalizedToOriginal) can throw NoSuchElementException when getPartitionFiltersAndDataFilters extracts partition sub-expressions from conjunction data filters via extractPredicatesWithinOutputSet. Those extracted sub-expressions are not keys in the normalizedToOriginal map (which only maps top-level expressions).
In practice Spark typically pre-splits conjunctions so the risk is low, but the code path is reachable for untranslatable conjunction expressions. Consider using .map(normalizedToOriginal.getOrElse(_, _)) with a fallback, or filtering out sub-expressions not in the map.
| refs.foreach { ref => | ||
| assert(ref.isInstanceOf[PartitionColumnReference], | ||
| assert(ref.isInstanceOf[PartitionFieldReference], | ||
| s"Expected PartitionColumnReference, got ${ref.getClass.getName}") |
There was a problem hiding this comment.
Assertion message still says PartitionColumnReference after the rename. Same issue at lines 469, 471, and 476.
| s"Expected PartitionColumnReference, got ${ref.getClass.getName}") | |
| s"Expected PartitionFieldReference, got ${ref.getClass.getName}") |
| * the partition columns (from {@link Table#partitioning()}) referenced by this predicate. | ||
| * Each reference's {@link PartitionColumnReference#fieldNames()} gives the partition column | ||
| * name; {@link PartitionColumnReference#ordinal()} gives the 0-based position in | ||
| * Each reference's {@link PartitionFieldReference#fieldNames()} gives the partition column |
There was a problem hiding this comment.
"partition column" should be "partition field" for consistency with the PartitionColumnReference → PartitionFieldReference rename.
| * Each reference's {@link PartitionFieldReference#fieldNames()} gives the partition column | |
| * Each reference's {@link PartitionFieldReference#fieldNames()} gives the partition field |
| } | ||
| val partitionNames = partitionSchema.map(_.name).toSet | ||
| val partitionNames = partitionFields.map(_.structField.name).toSet | ||
| val refNames = catalystExpr.references.map(_.name).toSet |
There was a problem hiding this comment.
It's anti-pattern to compare qualified names as single strings. I think one side is from the nested cols in partition predicates, the other side is reported by v2 table. How does v2 table report partition cols?
There was a problem hiding this comment.
yes, the comparison is between
- catalyst partition filter
- v2 table partition cols
Both are flattened (ie turn into "a.b.c"):
- using normalizePartitionFilters() which returns AttributeReference with flattened name
- using resolveIdentityPartitionFIeld() which returns StructField with flattened name.
V2Table reports it via Transform which has transform.ref.fieldNames() which is Seq[String]. But I do need to flatten it for comparison, do you have any other thoughts?
Another reason for flatten it is later I need to pass the partition schema as StructType to DataSourceUtils.getPartitionFiltersAndDataFilters. That has some valuable logic there (eg, extracting more partition filters) that I did not want to re-implement.
What changes were proposed in this pull request?
Supported nested columns.
Why are the changes needed?
DSV2 connectors support nested struct fields as partition fields.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add unit tests to DSV2EnhancedPartitionFilterSuite.
Was this patch authored or co-authored using generative AI tooling?
Yes, cursor with hand refactoring