Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -104,11 +106,38 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
}

/**
* Returns the output ordering from the data source if available, otherwise falls back
* to the default (no ordering). This allows data sources to report their natural ordering
* through `SupportsReportOrdering`.
* Returns the output ordering for this scan. When the output partitioning is a
* `KeyedPartitioning`, each partition contains rows where the key expressions evaluate to a
* single constant value (the partition's key), so the data is trivially sorted by those
* expressions within the partition. This structural guarantee holds regardless of whether
* `SupportsReportOrdering` reports any ordering. Key-derived sort orders are prepended to
* whatever `SupportsReportOrdering` reports (with duplicates filtered out).
*/
override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering)
override def outputOrdering: Seq[SortOrder] = {
val reportedOrdering = ordering.getOrElse(Seq.empty)
outputPartitioning match {
case k: KeyedPartitioning =>
val keyExprs = k.expressions
// Accumulates non-key equivalent expressions per key position, gathered from reported
// orderings that overlap with key expressions via `child` or `sameOrderExpressions`.
val sameExprsPerKey = keyExprs.map(_ => ArrayBuffer.empty[Expression])
val unmatchedReported = reportedOrdering.filterNot { so =>
val allExprs = so.child +: so.sameOrderExpressions
keyExprs.zipWithIndex.foldLeft(false) { case (matchedAny, (keyExpr, i)) =>
val (matched, rest) = allExprs.partition(_.semanticEquals(keyExpr))
if (matched.nonEmpty) {
sameExprsPerKey(i) ++= rest
true
} else matchedAny
}
}
val keyOrdering = keyExprs.zip(sameExprsPerKey).map { case (keyExpr, sameExprs) =>
SortOrder(keyExpr, Ascending, sameOrderExpressions = sameExprs.toSeq)
}
keyOrdering ++ unmatchedReported
case _ => reportedOrdering
}
}

override def supportsColumnar: Boolean = {
scan.columnarSupportMode() match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,31 @@ case class GroupPartitionsExec(
copy(child = newChild)

override def outputOrdering: Seq[SortOrder] = {
// when multiple partitions are grouped together, ordering inside partitions is not preserved
if (groupedPartitions.forall(_._2.size <= 1)) {
// No coalescing: each output partition is exactly one input partition. The child's
// within-partition ordering is fully preserved (including any key-derived ordering that
// `DataSourceV2ScanExecBase` already prepended).
child.outputOrdering
} else {
super.outputOrdering
// Coalescing: multiple input partitions are merged into one output partition. The child's
// within-partition ordering is lost due to concatenation, so we rederive ordering purely from
// the key expressions. A join may embed multiple `KeyedPartitioning`s (one per join side)
// within a single expression tree; they share the same partitionKeys but carry different
// expressions. Collect them all and expose each position's equivalent expressions via
// `sameOrderExpressions` so the planner can use any of them for ordering checks.
outputPartitioning match {
case p: Partitioning with Expression if reducers.isEmpty =>
// Without reducers all merged partitions share the same original key value, so the key
// expressions remain constant within the output partition.
val keyedPartitionings = p.collect { case k: KeyedPartitioning => k }
keyedPartitionings.map(_.expressions).transpose.map { exprs =>
SortOrder(exprs.head, Ascending, sameOrderExpressions = exprs.tail)
}
case _ =>
// With reducers, merged partitions share only the reduced key, not the original key
// expressions, which can take different values within the output partition.
super.outputOrdering
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,14 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
Seq(
// with no partitioning and no order, we expect shuffling AND sorting
(None, None, (true, true), (true, true)),
// partitioned by i and no order, we expect NO shuffling BUT sorting
(Some("i"), None, (false, true), (false, true)),
// partitioned by i and no order,
// we expect NO shuffling AND sorting for groupBy BUT sorting for window function
(Some("i"), None, (false, false), (false, true)),
// partitioned by i and in-partition sorted by i,
// we expect NO shuffling AND sorting for groupBy but sorting for window function
(Some("i"), Some("i"), (false, false), (false, true)),
// partitioned by i and in-partition sorted by j, we expect NO shuffling BUT sorting
(Some("i"), Some("j"), (false, true), (false, true)),
// partitioned by i and in-partition sorted by j, we expect NO shuffling NOR sorting
(Some("i"), Some("j"), (false, false), (false, false)),
// partitioned by i and in-partition sorted by i,j, we expect NO shuffling NOR sorting
(Some("i"), Some("i,j"), (false, false), (false, false)),
// partitioned by j and in-partition sorted by i, we expect shuffling AND sorting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.Collections
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.Expressions._
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan}
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec}
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand Down Expand Up @@ -3568,4 +3568,115 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with
}
}
}

test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") {
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " +
"(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2022-01-01' as timestamp))")

val df = sql(s"SELECT id, name FROM testcat.ns.$items")
val plan = df.queryExecution.executedPlan
val scans = collectScans(plan)
assert(scans.size === 1)
// The scan's outputOrdering should include an ascending sort on the partition key `id`,
// derived from the KeyedPartitioning - regardless of SupportsReportOrdering.
val ordering = scans.head.outputOrdering
assert(ordering.length === 1)
assert(ordering.head.direction === Ascending)
// identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils.
val keyExpr = ordering.head.child
assert(keyExpr.isInstanceOf[AttributeReference])
assert(keyExpr.asInstanceOf[AttributeReference].name === "id")
}

test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " +
"no pre-join SortExec needed before SortMergeJoin") {
// Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them,
// but each group has exactly one partition — no coalescing.
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " +
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchasesColumns, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
"(2, 200.0, cast('2021-01-01' as timestamp))")

val df = sql(
s"""
|${selectWithMergeJoinHint("i", "p")}
|i.id, i.name
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
|""".stripMargin)

checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb")))

val plan = df.queryExecution.executedPlan
val groupPartitions = collectGroupPartitions(plan)
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)),
"expected non-coalescing GroupPartitionsExec")
// GroupPartitionsExec passes through the child's key-derived outputOrdering.
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
// the SMJ.
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
smjs.foreach { smj =>
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " +
"non-coalescing GroupPartitions")
}
}

test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " +
"no pre-join SortExec needed before SortMergeJoin") {
// Duplicate key 1 on both sides causes coalescing.
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
"(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchasesColumns, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
"(1, 110.0, cast('2021-06-01' as timestamp)), " +
"(2, 200.0, cast('2021-01-01' as timestamp))")

val df = sql(
s"""
|${selectWithMergeJoinHint("i", "p")}
|i.id, i.name
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
|""".stripMargin)

checkAnswer(df, Seq(
Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"),
Row(2, "bb")))

val plan = df.queryExecution.executedPlan
val groupPartitions = collectGroupPartitions(plan)
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)),
"expected coalescing GroupPartitionsExec")
// GroupPartitionsExec derives outputOrdering from the key expressions after coalescing.
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
// the SMJ.
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
smjs.foreach { smj =>
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " +
"from coalesced partition key")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection}
import org.apache.spark.sql.execution.DummySparkPlan
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.IntegerType

class GroupPartitionsExecSuite extends SharedSparkSession {

private val exprA = AttributeReference("a", IntegerType)()
private val exprB = AttributeReference("b", IntegerType)()

private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a))
private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b))

test("SPARK-56241: non-coalescing passes through child ordering unchanged") {
// Each partition has a distinct key — no coalescing happens.
val partitionKeys = Seq(row(1), row(2), row(3))
val childOrdering = Seq(SortOrder(exprA, Ascending))
val child = DummySparkPlan(
outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys),
outputOrdering = childOrdering)
val gpe = GroupPartitionsExec(child)

assert(gpe.groupedPartitions.forall(_._2.size <= 1), "expected non-coalescing")
assert(gpe.outputOrdering === childOrdering)
}

test("SPARK-56241: coalescing without reducers returns key-derived ordering") {
// Key 1 appears on partitions 0 and 2, causing coalescing.
val partitionKeys = Seq(row(1), row(2), row(1))
val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys))
val gpe = GroupPartitionsExec(child)

assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
val ordering = gpe.outputOrdering
assert(ordering.length === 1)
assert(ordering.head.child === exprA)
assert(ordering.head.direction === Ascending)
assert(ordering.head.sameOrderExpressions.isEmpty)
}

test("SPARK-56241: coalescing without reducers returns one SortOrder per key expression") {
// Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing.
val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10))
val child = DummySparkPlan(
outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys))
val gpe = GroupPartitionsExec(child)

assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
val ordering = gpe.outputOrdering
assert(ordering.length === 2)
assert(ordering.head.child === exprA)
assert(ordering(1).child === exprB)
assert(ordering.head.sameOrderExpressions.isEmpty)
assert(ordering(1).sameOrderExpressions.isEmpty)
}

test("SPARK-56241: coalescing join case exposes sameOrderExpressions across join sides") {
// PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same
// partition keys. Key 1 coalesces partitions 0 and 2.
val partitionKeys = Seq(row(1), row(2), row(1))
val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys)
val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys)
val child = DummySparkPlan(outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)))
val gpe = GroupPartitionsExec(child)

assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
val ordering = gpe.outputOrdering
assert(ordering.length === 1)
assert(ordering.head.child === exprA)
assert(ordering.head.sameOrderExpressions === Seq(exprB))
}

test("SPARK-56241: coalescing with reducers returns empty ordering") {
// When reducers are present, the original key expressions are not constant within the merged
// partition, so outputOrdering falls back to the default (empty).
val partitionKeys = Seq(row(1), row(2), row(1))
val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys))
// reducers = Some(Seq(None)) - None element means identity reducer; the important thing is
// that reducers.isDefined, which triggers the fallback.
val gpe = GroupPartitionsExec(child, reducers = Some(Seq(None)))

assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing")
assert(gpe.outputOrdering === Nil)
}
}