From 14188ec034692f9d823c6a8b541e1d2130909a6e Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 26 Mar 2026 17:36:44 +0100 Subject: [PATCH 1/2] [SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Within a `KeyedPartitioning` partition, all rows share the same key value, so the key expressions are trivially sorted (ascending) within each partition. This PR makes two plan nodes expose that structural guarantee via `outputOrdering`: - **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a `KeyedPartitioning`, prepend one ascending `SortOrder` per key expression to whatever `SupportsReportOrdering` reports, merging overlapping `sameOrderExpressions` in a single pass. - **`GroupPartitionsExec`**: - *Non-coalescing* (every group has ≤ 1 input partition): pass through `child.outputOrdering` unchanged. - *Coalescing without reducers*: re-derive ordering from the output `KeyedPartitioning` key expressions; a join may embed multiple `KeyedPartitioning`s with different expressions — expose equivalences via `sameOrderExpressions`. - *Coalescing with reducers*: fall back to `super.outputOrdering` (empty), because merged partitions share only the reduced key. ### Why are the changes needed? Before this change, `outputOrdering` on both nodes returned an empty sequence (unless `SupportsReportOrdering` was implemented), even though the within- partition ordering was structurally guaranteed by the partitioning itself. As a result, `EnsureRequirements` would insert a redundant `SortExec` before `SortMergeJoin` inputs that are already in key order. ### Does this PR introduce _any_ user-facing change? Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add a redundant `SortExec` before `SortMergeJoin` when the join keys match the partition keys, reducing CPU and memory overhead. ### How was this patch tested? - New unit test class `GroupPartitionsExecSuite` covering all four `outputOrdering` branches (non-coalescing, coalescing without reducers with single and multi-key, join `sameOrderExpressions`, coalescing with reducers). - New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241): - Scan with `KeyedPartitioning` reports key-derived `outputOrdering`. - Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes through child ordering — no pre-join `SortExec`. - Coalescing `GroupPartitionsExec` derives ordering from key expressions — no pre-join `SortExec`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 --- .../v2/DataSourceV2ScanExecBase.scala | 39 +++++- .../datasources/v2/GroupPartitionsExec.scala | 24 +++- .../KeyGroupedPartitioningSuite.scala | 115 +++++++++++++++++- .../v2/GroupPartitionsExecSuite.scala | 106 ++++++++++++++++ 4 files changed, 275 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 877e65341c1c8..c5a11abe60502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning import org.apache.spark.sql.catalyst.util.truncatedString @@ -104,11 +106,38 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } /** - * Returns the output ordering from the data source if available, otherwise falls back - * to the default (no ordering). This allows data sources to report their natural ordering - * through `SupportsReportOrdering`. + * Returns the output ordering for this scan. When the output partitioning is a + * `KeyedPartitioning`, each partition contains rows where the key expressions evaluate to a + * single constant value (the partition's key), so the data is trivially sorted by those + * expressions within the partition. This structural guarantee holds regardless of whether + * `SupportsReportOrdering` reports any ordering. Key-derived sort orders are prepended to + * whatever `SupportsReportOrdering` reports (with duplicates filtered out). */ - override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering) + override def outputOrdering: Seq[SortOrder] = { + val reportedOrdering = ordering.getOrElse(Seq.empty) + outputPartitioning match { + case k: KeyedPartitioning => + val keyExprs = k.expressions + // Accumulates non-key equivalent expressions per key position, gathered from reported + // orderings that overlap with key expressions via `child` or `sameOrderExpressions`. + val sameExprsPerKey = keyExprs.map(_ => ArrayBuffer.empty[Expression]) + val unmatchedReported = reportedOrdering.filterNot { so => + val allExprs = so.child +: so.sameOrderExpressions + keyExprs.zipWithIndex.foldLeft(false) { case (matchedAny, (keyExpr, i)) => + val (matched, rest) = allExprs.partition(_.semanticEquals(keyExpr)) + if (matched.nonEmpty) { + sameExprsPerKey(i) ++= rest + true + } else matchedAny + } + } + val keyOrdering = keyExprs.zip(sameExprsPerKey).map { case (keyExpr, sameExprs) => + SortOrder(keyExpr, Ascending, sameOrderExpressions = sameExprs.toSeq) + } + keyOrdering ++ unmatchedReported + case _ => reportedOrdering + } + } override def supportsColumnar: Boolean = { scan.columnarSupportMode() match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 7ed394df8b300..7a09afddef49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -184,11 +184,31 @@ case class GroupPartitionsExec( copy(child = newChild) override def outputOrdering: Seq[SortOrder] = { - // when multiple partitions are grouped together, ordering inside partitions is not preserved if (groupedPartitions.forall(_._2.size <= 1)) { + // No coalescing: each output partition is exactly one input partition. The child's + // within-partition ordering is fully preserved (including any key-derived ordering that + // `DataSourceV2ScanExecBase` already prepended). child.outputOrdering } else { - super.outputOrdering + // Coalescing: multiple input partitions are merged into one output partition. The child's + // within-partition ordering is lost due to concatenation, so we rederive ordering purely from + // the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side) + // within a single expression tree; they share the same partitionKeys but carry different + // expressions. Collect them all and expose each position's equivalent expressions via + // `sameOrderExpressions` so the planner can use any of them for ordering checks. + outputPartitioning match { + case p: Partitioning with Expression if reducers.isEmpty => + // Without reducers all merged partitions share the same original key value, so the key + // expressions remain constant within the output partition. + val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } + keyedPartitionings.map(_.expressions).transpose.map { exprs => + SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail) + } + case _ => + // With reducers, merged partitions share only the reduced key, not the original key + // expressions, which can take different values within the output partition. + super.outputOrdering + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 688196b47502e..43b9b058bdee3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -22,14 +22,14 @@ import java.util.Collections import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.Expressions._ -import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan} +import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec} import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -3568,4 +3568,115 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } } + + test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " + + "(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2022-01-01' as timestamp))") + + val df = sql(s"SELECT id, name FROM testcat.ns.$items") + val plan = df.queryExecution.executedPlan + val scans = collectScans(plan) + assert(scans.size === 1) + // The scan's outputOrdering should include an ascending sort on the partition key `id`, + // derived from the KeyedPartitioning - regardless of SupportsReportOrdering. + val ordering = scans.head.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.direction === Ascending) + // identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils. + val keyExpr = ordering.head.child + assert(keyExpr.isInstanceOf[AttributeReference]) + assert(keyExpr.asInstanceOf[AttributeReference].name === "id") + } + + test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them, + // but each group has exactly one partition — no coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)), + "expected non-coalescing GroupPartitionsExec") + // GroupPartitionsExec passes through the child's key-derived outputOrdering. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " + + "non-coalescing GroupPartitions") + } + } + + test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Duplicate key 1 on both sides causes coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(1, 110.0, cast('2021-06-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq( + Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"), + Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)), + "expected coalescing GroupPartitionsExec") + // GroupPartitionsExec derives outputOrdering from the key expressions after coalescing. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " + + "from coalesced partition key") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala new file mode 100644 index 0000000000000..012c0b1027d49 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection} +import org.apache.spark.sql.execution.DummySparkPlan +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.IntegerType + +class GroupPartitionsExecSuite extends SharedSparkSession { + + private val exprA = AttributeReference("a", IntegerType)() + private val exprB = AttributeReference("b", IntegerType)() + + private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a)) + private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b)) + + test("SPARK-56241: non-coalescing passes through child ordering unchanged") { + // Each partition has a distinct key — no coalescing happens. + val partitionKeys = Seq(row(1), row(2), row(3)) + val childOrdering = Seq(SortOrder(exprA, Ascending)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = childOrdering) + val gpe = GroupPartitionsExec(child) + + assert(gpe.groupedPartitions.forall(_._2.size <= 1), "expected non-coalescing") + assert(gpe.outputOrdering === childOrdering) + } + + test("SPARK-56241: coalescing without reducers returns key-derived ordering") { + // Key 1 appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.direction === Ascending) + assert(ordering.head.sameOrderExpressions.isEmpty) + } + + test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") { + // Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys)) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 2) + assert(ordering.head.child === exprA) + assert(ordering(1).child === exprB) + assert(ordering.head.sameOrderExpressions.isEmpty) + assert(ordering(1).sameOrderExpressions.isEmpty) + } + + test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") { + // PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same + // partition keys. Key 1 coalesces partitions 0 and 2. + val partitionKeys = Seq(row(1), row(2), row(1)) + val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys) + val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys) + val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.sameOrderExpressions === Seq(exprB)) + } + + test("SPARK-56241: coalescing with reducers returns empty ordering") { + // When reducers are present, the original key expressions are not constant within the merged + // partition, so outputOrdering falls back to the default (empty). + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + // reducers = Some(Seq(None)) - None element means identity reducer; the important thing is + // that reducers.isDefined, which triggers the fallback. + val gpe = GroupPartitionsExec(child, reducers = Some(Seq(None))) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + } +} From 4260f53c11e8fe60726959319d1e0595fc1a1112 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 26 Mar 2026 20:36:25 +0100 Subject: [PATCH 2/2] fix expected test output --- .../apache/spark/sql/connector/DataSourceV2Suite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index a09b7e0827c49..38602e7e0073a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -313,13 +313,14 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS Seq( // with no partitioning and no order, we expect shuffling AND sorting (None, None, (true, true), (true, true)), - // partitioned by i and no order, we expect NO shuffling BUT sorting - (Some("i"), None, (false, true), (false, true)), + // partitioned by i and no order, + // we expect NO shuffling AND sorting for groupBy BUT sorting for window function + (Some("i"), None, (false, false), (false, true)), // partitioned by i and in-partition sorted by i, // we expect NO shuffling AND sorting for groupBy but sorting for window function (Some("i"), Some("i"), (false, false), (false, true)), - // partitioned by i and in-partition sorted by j, we expect NO shuffling BUT sorting - (Some("i"), Some("j"), (false, true), (false, true)), + // partitioned by i and in-partition sorted by j, we expect NO shuffling NOR sorting + (Some("i"), Some("j"), (false, false), (false, false)), // partitioned by i and in-partition sorted by i,j, we expect NO shuffling NOR sorting (Some("i"), Some("i,j"), (false, false), (false, false)), // partitioned by j and in-partition sorted by i, we expect shuffling AND sorting