From d0ba07cce75bf9acef3587167d7f9ed6deeadbe9 Mon Sep 17 00:00:00 2001 From: ziqi liu Date: Sat, 21 Mar 2026 05:15:53 +0000 Subject: [PATCH 1/3] dev --- .../optimizer/PropagateEmptyRelation.scala | 9 +- .../plans/logical/EmptyRelation.scala | 45 ++++ .../sql/catalyst/trees/TreePatterns.scala | 1 + .../optimizer/OptimizeLimitZeroSuite.scala | 74 ++++-- .../PropagateEmptyRelationSuite.scala | 211 ++++++++++++------ .../sql/execution/EmptyRelationExec.scala | 14 +- .../adaptive/AQEPropagateEmptyRelation.scala | 14 +- .../sql/execution/ui/SparkPlanInfoSuite.scala | 57 ++++- 8 files changed, 318 insertions(+), 107 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index aae092bcb2632..fdbee08a05ebf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeTag -import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, REPARTITION_OPERATION, TRUE_OR_FALSE_LITERAL} +import org.apache.spark.sql.catalyst.trees.TreePattern.{EMPTY_RELATION, LOCAL_RELATION, REPARTITION_OPERATION, TRUE_OR_FALSE_LITERAL} /** * The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, REPARTIT * Right side is non-empty and condition is empty. Eliminate join to its left side. * - Left anti join * Right side is non-empty and condition is empty. Eliminate join to an empty - * [[LocalRelation]]. + * [[EmptyRelation]] so the underlying plan is preserved. * 3. Unary-node Logical Plans * - Project/Filter/Sample with all empty children. * - Limit/Repartition/RepartitionByExpression/Rebalance with all empty children. @@ -50,6 +50,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup protected def isEmpty(plan: LogicalPlan): Boolean = plan match { case p: LocalRelation => p.data.isEmpty + case _: EmptyRelation => true case _ => false } @@ -59,7 +60,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup } protected def empty(plan: LogicalPlan): LogicalPlan = - LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) + EmptyRelation(plan) // Construct a project list from plan's output, while the value is always NULL. private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = @@ -218,7 +219,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def applyInternal(p: LogicalPlan): LogicalPlan = p.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { + _.containsAnyPattern(LOCAL_RELATION, EMPTY_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { commonApplyFunc } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala index 9e055ae7f3bd8..ce70d6fcc886f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala @@ -19,8 +19,53 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.trees.TreePattern.{EMPTY_RELATION, TreePattern} case class EmptyRelation(logical: LogicalPlan) extends LeafNode { + override val nodePatterns: Seq[TreePattern] = Seq(EMPTY_RELATION) + + override protected def stringArgs: Iterator[Any] = Iterator.empty + + override def generateTreeString( + depth: Int, + lastChildren: java.util.ArrayList[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean, + printOutputColumns: Boolean, + indent: Int = 0): Unit = { + super.generateTreeString( + depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId, + printOutputColumns, + indent) + // Nested logical operators are not registered in QueryPlan.localIdMap with physical ids. + Option(logical).foreach { _ => + lastChildren.add(true) + logical.generateTreeString( + depth + 1, + lastChildren, + append, + verbose, + prefix = "", + addSuffix = false, + maxFields, + printNodeId = false, + printOutputColumns, + indent) + lastChildren.remove(lastChildren.size() - 1) + } + } + override val isStreaming: Boolean = logical.isStreaming override val outputOrdering: Seq[SortOrder] = logical.outputOrdering diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 1e22c1ce86539..75ba8eb72df1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -136,6 +136,7 @@ object TreePattern extends Enumeration { val DESERIALIZE_TO_OBJECT: Value = Value val DF_DROP_COLUMNS: Value = Value val DISTINCT_LIKE: Value = Value + val EMPTY_RELATION: Value = Value val EVAL_PYTHON_UDF: Value = Value val EVAL_PYTHON_UDTF: Value = Value val EVENT_TIME_WATERMARK: Value = Value diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index 4ebb7752cc8d0..3ba88209a11b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -22,12 +22,32 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Distinct, EmptyRelation, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.IntegerType // Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios class OptimizeLimitZeroSuite extends PlanTest { + + /** [[EmptyRelation]] or empty [[LocalRelation]] vs reference `expectedEmptyLocalRelation`. */ + private def assertEmptyRelationOutput( + optimized: LogicalPlan, + expectedEmptyLocalRelation: LocalRelation): Unit = { + val expectedAttrs = expectedEmptyLocalRelation.output + optimized match { + case e: EmptyRelation => + assert( + e.output.map(a => (a.name, a.dataType, a.nullable, a.metadata)) == + expectedAttrs.map(a => (a.name, a.dataType, a.nullable, a.metadata))) + case lr: LocalRelation if lr.data.isEmpty => + comparePlans(optimized, expectedEmptyLocalRelation) + case other => + fail( + s"Expected EmptyRelation or empty LocalRelation, got ${other.getClass.getSimpleName}:\n" + + other) + } + } + object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("OptimizeLimitZero", Once, @@ -45,7 +65,7 @@ class OptimizeLimitZeroSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationOutput(optimized, correctAnswer) } test("Limit 0: individual LocalLimit 0 node") { @@ -54,7 +74,7 @@ class OptimizeLimitZeroSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationOutput(optimized, correctAnswer) } test("Limit 0: individual GlobalLimit 0 node") { @@ -63,25 +83,33 @@ class OptimizeLimitZeroSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationOutput(optimized, correctAnswer) } Seq( - (Inner, LocalRelation($"a".int, $"b".int)), - (LeftOuter, Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1) - .analyze), - (RightOuter, LocalRelation($"a".int, $"b".int)), - (FullOuter, Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1) - .analyze) - ).foreach { case (jt, correctAnswer) => - test(s"Limit 0: for join type $jt") { - val query = testRelation1 - .join(testRelation2.limit(0), joinType = jt, condition = Some($"a".attr === $"b".attr)) - - val optimized = Optimize.execute(query.analyze) - + (Inner, LocalRelation($"a".int, $"b".int), true), + ( + LeftOuter, + Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze, + false), + (RightOuter, LocalRelation($"a".int, $"b".int), true), + ( + FullOuter, + Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze, + false) + ).foreach { case (jt, correctAnswer, expectEmptyRelationOrEmptyLocal) => + test(s"Limit 0: for join type $jt") { + val query = testRelation1 + .join(testRelation2.limit(0), joinType = jt, condition = Some($"a".attr === $"b".attr)) + + val optimized = Optimize.execute(query.analyze) + + if (expectEmptyRelationOrEmptyLocal) { + assertEmptyRelationOutput(optimized, correctAnswer.asInstanceOf[LocalRelation]) + } else { comparePlans(optimized, correctAnswer) } + } } test("Limit 0: 3-way join") { @@ -95,7 +123,7 @@ class OptimizeLimitZeroSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int, $"b".int, $"c".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationOutput(optimized, correctAnswer) } test("Limit 0: intersect") { @@ -103,8 +131,14 @@ class OptimizeLimitZeroSuite extends PlanTest { .intersect(testRelation1.limit(0), isAll = false) val optimized = Optimize.execute(query.analyze) - val correctAnswer = Distinct(LocalRelation($"a".int)) - comparePlans(optimized, correctAnswer) + optimized match { + case Distinct(child: EmptyRelation) => + val correctAnswer = LocalRelation($"a".int) + + assertEmptyRelationOutput(child, correctAnswer) + case other => + fail(s"Expected Distinct(EmptyRelation), got ${other.getClass.getSimpleName}:\n$other") + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 723d0db4f0838..d6f6582e556b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -24,13 +24,35 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, UnspecifiedFrame} import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{EmptyRelation, Expand, Join, LocalRelation, LogicalPlan, Project, Repartition} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, MetadataBuilder} class PropagateEmptyRelationSuite extends PlanTest { + + /** [[EmptyRelation]] or empty [[LocalRelation]] vs reference `expectedEmptyLocalRelation`. */ + private def assertEmptyRelationPropagated( + optimized: LogicalPlan, + expectedEmptyLocalRelation: LocalRelation): Unit = { + val expectedAttrs = expectedEmptyLocalRelation.output + optimized match { + case e: EmptyRelation => + assert( + e.output.map(a => (a.name, a.dataType, a.nullable, a.metadata)) == + expectedAttrs.map(a => (a.name, a.dataType, a.nullable, a.metadata)), + s"Output mismatch:\n${e.output}\nvs\n$expectedAttrs") + assert(e.isStreaming == expectedEmptyLocalRelation.isStreaming) + case lr: LocalRelation if lr.data.isEmpty => + comparePlans(optimized, expectedEmptyLocalRelation) + case other => + fail( + s"Expected EmptyRelation or empty LocalRelation, got ${other.getClass.getSimpleName}:\n" + + other) + } + } + object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("PropagateEmptyRelation", FixedPoint(1), @@ -70,7 +92,7 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationPropagated(optimized, correctAnswer) } test("SPARK-32241: remove empty relation children from Union") { @@ -107,8 +129,10 @@ class PropagateEmptyRelationSuite extends PlanTest { } test("propagate empty relation through Join") { - // Testcases are tuples of (left predicate, right predicate, joinType, correct answer) - // Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation. + // Testcases are tuples of (left predicate, right predicate, joinType, correct answer). + // `None` compares with OptimizeWithoutPropagateEmptyRelation. + // `Some((plan, expectEmptyRelationOrEmptyLocal))` — if true, `plan` is a reference + // [[LocalRelation]] for [[assertEmptyRelationPropagated]]; otherwise [[comparePlans]]. val testcases = Seq( (true, true, Inner, None), (true, true, Cross, None), @@ -118,37 +142,41 @@ class PropagateEmptyRelationSuite extends PlanTest { (true, true, LeftAnti, None), (true, true, LeftSemi, None), - (true, false, Inner, Some(LocalRelation($"a".int, $"b".int))), - (true, false, Cross, Some(LocalRelation($"a".int, $"b".int))), + (true, false, Inner, Some((LocalRelation($"a".int, $"b".int), true))), + (true, false, Cross, Some((LocalRelation($"a".int, $"b".int), true))), (true, false, LeftOuter, - Some(Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1) - .analyze)), - (true, false, RightOuter, Some(LocalRelation($"a".int, $"b".int))), + Some(( + Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze, + false))), + (true, false, RightOuter, Some((LocalRelation($"a".int, $"b".int), true))), (true, false, FullOuter, - Some(Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1) - .analyze)), - (true, false, LeftAnti, Some(testRelation1)), - (true, false, LeftSemi, Some(LocalRelation($"a".int))), - - (false, true, Inner, Some(LocalRelation($"a".int, $"b".int))), - (false, true, Cross, Some(LocalRelation($"a".int, $"b".int))), - (false, true, LeftOuter, Some(LocalRelation($"a".int, $"b".int))), + Some(( + Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze, + false))), + (true, false, LeftAnti, Some((testRelation1, false))), + (true, false, LeftSemi, Some((LocalRelation($"a".int), true))), + + (false, true, Inner, Some((LocalRelation($"a".int, $"b".int), true))), + (false, true, Cross, Some((LocalRelation($"a".int, $"b".int), true))), + (false, true, LeftOuter, Some((LocalRelation($"a".int, $"b".int), true))), (false, true, RightOuter, - Some(Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2) - .analyze)), + Some(( + Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2).analyze, + false))), (false, true, FullOuter, - Some(Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2) - .analyze)), - (false, true, LeftAnti, Some(LocalRelation($"a".int))), - (false, true, LeftSemi, Some(LocalRelation($"a".int))), - - (false, false, Inner, Some(LocalRelation($"a".int, $"b".int))), - (false, false, Cross, Some(LocalRelation($"a".int, $"b".int))), - (false, false, LeftOuter, Some(LocalRelation($"a".int, $"b".int))), - (false, false, RightOuter, Some(LocalRelation($"a".int, $"b".int))), - (false, false, FullOuter, Some(LocalRelation($"a".int, $"b".int))), - (false, false, LeftAnti, Some(LocalRelation($"a".int))), - (false, false, LeftSemi, Some(LocalRelation($"a".int))) + Some(( + Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2).analyze, + false))), + (false, true, LeftAnti, Some((LocalRelation($"a".int), true))), + (false, true, LeftSemi, Some((LocalRelation($"a".int), true))), + + (false, false, Inner, Some((LocalRelation($"a".int, $"b".int), true))), + (false, false, Cross, Some((LocalRelation($"a".int, $"b".int), true))), + (false, false, LeftOuter, Some((LocalRelation($"a".int, $"b".int), true))), + (false, false, RightOuter, Some((LocalRelation($"a".int, $"b".int), true))), + (false, false, FullOuter, Some((LocalRelation($"a".int, $"b".int), true))), + (false, false, LeftAnti, Some((LocalRelation($"a".int), true))), + (false, false, LeftSemi, Some((LocalRelation($"a".int), true))) ) testcases.foreach { case (left, right, jt, answer) => @@ -156,33 +184,59 @@ class PropagateEmptyRelationSuite extends PlanTest { .where(left) .join(testRelation2.where(right), joinType = jt, condition = Some($"a".attr === $"b".attr)) val optimized = Optimize.execute(query.analyze) - val correctAnswer = - answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) - comparePlans(optimized, correctAnswer) + + answer match { + case None => + comparePlans( + optimized, + OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + case Some((correctAnswer, expectEmptyRelationOrEmptyLocal)) => + if (expectEmptyRelationOrEmptyLocal) { + assertEmptyRelationPropagated( + optimized, + correctAnswer.asInstanceOf[LocalRelation]) + } else { + comparePlans(optimized, correctAnswer) + } + } } } test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") { val testcases = Seq( - (Inner, Some(LocalRelation($"a".int, $"b".int))), - (Cross, Some(LocalRelation($"a".int, $"b".int))), + (Inner, Some((LocalRelation($"a".int, $"b".int), true))), + (Cross, Some((LocalRelation($"a".int, $"b".int), true))), (LeftOuter, - Some(Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1) - .analyze)), + Some(( + Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze, + false))), (RightOuter, - Some(Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2) - .analyze)), + Some(( + Project(Seq(Literal(null).cast(IntegerType).as("a"), $"b"), testRelation2).analyze, + false))), (FullOuter, None), - (LeftAnti, Some(testRelation1)), - (LeftSemi, Some(LocalRelation($"a".int))) + (LeftAnti, Some((testRelation1, false))), + (LeftSemi, Some((LocalRelation($"a".int), true))) ) testcases.foreach { case (jt, answer) => val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral)) val optimized = Optimize.execute(query.analyze) - val correctAnswer = - answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) - comparePlans(optimized, correctAnswer) + + answer match { + case None => + comparePlans( + optimized, + OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + case Some((correctAnswer, expectEmptyRelationOrEmptyLocal)) => + if (expectEmptyRelationOrEmptyLocal) { + assertEmptyRelationPropagated( + optimized, + correctAnswer.asInstanceOf[LocalRelation]) + } else { + comparePlans(optimized, correctAnswer) + } + } } } @@ -197,7 +251,7 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int) - comparePlans(optimized, correctAnswer) + assertEmptyRelationPropagated(optimized, correctAnswer) } test("propagate empty streaming relation through multiple UnaryNode") { @@ -221,7 +275,7 @@ class PropagateEmptyRelationSuite extends PlanTest { SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true") { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation(output, isStreaming = true) - comparePlans(optimized, correctAnswer) + assertEmptyRelationPropagated(optimized, correctAnswer) } withSQLConf( @@ -267,13 +321,16 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(analyzedQuery) // This is to deal with analysis for join condition. We expect the analyzed plan to contain // filter and projection in batch relation, and know they will go away after optimization. - // The point to check here is that the node is replaced with "empty" LocalRelation, but the + // The point to check here is that the node is replaced with "empty" [[EmptyRelation]], but the // flag `isStreaming` is properly propagated. - val correctAnswer = analyzedQuery transform { - case Project(_, Filter(_, l: LocalRelation)) => l.copy(data = Seq.empty) + optimized.collect { case j: Join => j }.head match { + case Join(_, er: EmptyRelation, LeftOuter, _, _) => + assert(!er.isStreaming) + assert( + er.output.map(a => (a.name, a.dataType)) == outputForBatch.map(a => (a.name, a.dataType))) + case other => + fail(s"expected LeftOuter join with EmptyRelation batch side, got\n$other") } - - comparePlans(optimized, correctAnswer) } test("don't propagate empty streaming relation through agg") { @@ -309,7 +366,6 @@ class PropagateEmptyRelationSuite extends PlanTest { .groupBy($"a")($"a") .orderBy($"a".asc) .select($"a") - comparePlans(optimized, correctAnswer.analyze) } @@ -321,7 +377,7 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = LocalRelation($"a".int, $"x".int).analyze - comparePlans(optimized, correctAnswer) + assertEmptyRelationPropagated(optimized, correctAnswer.asInstanceOf[LocalRelation]) } test("don't propagate empty relation through Aggregate without grouping expressions") { @@ -353,22 +409,21 @@ class PropagateEmptyRelationSuite extends PlanTest { LocalRelation.fromExternalRows(Seq($"a".int, $"b".int, $"c".int), Nil)).analyze val optimized = Optimize.execute(query) val expected = LocalRelation.fromExternalRows(Seq($"a".int, $"b".int, $"c".int), Nil) - comparePlans(optimized, expected) + assertEmptyRelationPropagated(optimized, expected) } test("SPARK-37904: Improve rebalance in PropagateEmptyRelation") { val emptyRelation = LocalRelation($"a".int) - val expected = emptyRelation.analyze // test root node val plan1 = emptyRelation.rebalance($"a").analyze val optimized1 = Optimize.execute(plan1) - comparePlans(optimized1, expected) + assertEmptyRelationPropagated(optimized1, emptyRelation) // test non-root node val plan2 = emptyRelation.rebalance($"a").where($"a" > 0).select($"a").analyze val optimized2 = Optimize.execute(plan2) - comparePlans(optimized2, expected) + assertEmptyRelationPropagated(optimized2, emptyRelation) } test("SPARK-39449: Propagate empty relation through Window") { @@ -379,27 +434,29 @@ class PropagateEmptyRelationSuite extends PlanTest { .as("window")) val expected = LocalRelation - .fromExternalRows(Seq($"a".int, $"b".int, $"window".long.withNullability(false)), Nil) - comparePlans(Optimize.execute(originalQuery.analyze), expected.analyze) + .fromExternalRows( + Seq($"a".int, $"b".int, $"window".long.withNullability(false)), + Nil) + assertEmptyRelationPropagated(Optimize.execute(originalQuery.analyze), expected) } test("Propagate empty relation with repartition") { val emptyRelation = LocalRelation($"a".int, $"b".int) - comparePlans(Optimize.execute( - emptyRelation.repartition(1).sortBy($"a".asc).analyze - ), emptyRelation.analyze) + assertEmptyRelationPropagated( + Optimize.execute(emptyRelation.repartition(1).sortBy($"a".asc).analyze), + emptyRelation) - comparePlans(Optimize.execute( - emptyRelation.distribute($"a")(1).sortBy($"a".asc).analyze - ), emptyRelation.analyze) + assertEmptyRelationPropagated( + Optimize.execute(emptyRelation.distribute($"a")(1).sortBy($"a".asc).analyze), + emptyRelation) - comparePlans(Optimize.execute( - emptyRelation.repartition().analyze - ), emptyRelation.analyze) + assertEmptyRelationPropagated( + Optimize.execute(emptyRelation.repartition().analyze), + emptyRelation) - comparePlans(Optimize.execute( - emptyRelation.repartition(1).sortBy($"a".asc).repartition().analyze - ), emptyRelation.analyze) + assertEmptyRelationPropagated( + Optimize.execute(emptyRelation.repartition(1).sortBy($"a".asc).repartition().analyze), + emptyRelation) } test("SPARK-39915: Dataset.repartition(N) may not create N partitions") { @@ -417,7 +474,13 @@ class PropagateEmptyRelationSuite extends PlanTest { comparePlans(Optimize.execute(p4), p4) val p5 = emptyRelation.sortBy("$a".asc).repartition().limit(1).repartition(1).analyze - val expected5 = emptyRelation.repartition(1).analyze - comparePlans(Optimize.execute(p5), expected5) + Optimize.execute(p5) match { + case Repartition(np, sh, er: EmptyRelation) if np == 1 && sh => + assertEmptyRelationPropagated(er, emptyRelation) + case other => + fail( + s"Expected Repartition(1, shuffle=true, EmptyRelation), got " + + s"${other.getClass.getSimpleName}:\n$other") + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 70c9e5359e2ab..2509d0b3c290e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -73,11 +73,21 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo printNodeId, printOutputColumns, indent) + // Preserved logical subtree is not in QueryPlan.localIdMap; avoid simpleStringWithNodeId which + // would print "(unknown)" for every nested logical operator in formatted explain / SQL UI. Option(logical).foreach { _ => lastChildren.add(true) logical.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, - printOutputColumns, indent) + depth + 1, + lastChildren, + append, + verbose, + prefix = "", + addSuffix = false, + maxFields, + printNodeId = false, + printOutputColumns, + indent) lastChildren.remove(lastChildren.size() - 1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index e2a013b9e814c..c5eb16227331f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} +import org.apache.spark.sql.catalyst.trees.TreePattern.{EMPTY_RELATION, LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys * compared to [[PropagateEmptyRelationBase]]: * 1. Join is single column NULL-aware anti join (NAAJ) * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an - * empty [[LocalRelation]]. + * empty [[EmptyRelation]]. */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = @@ -40,8 +40,6 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) - override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan) - private def isRootRepartition(plan: LogicalPlan): Boolean = plan match { case l: LogicalQueryStage if l.containsTag(ROOT_REPARTITION) => true case _ => false @@ -95,12 +93,16 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { } override protected def applyInternal(p: LogicalPlan): LogicalPlan = p.transformUpWithPruning( - // LOCAL_RELATION and TRUE_OR_FALSE_LITERAL pattern are matched at + // LOCAL_RELATION, EMPTY_RELATION, and TRUE_OR_FALSE_LITERAL patterns are matched at // `PropagateEmptyRelationBase.commonApplyFunc` // LOGICAL_QUERY_STAGE pattern is matched at `PropagateEmptyRelationBase.commonApplyFunc` // and `AQEPropagateEmptyRelation.eliminateSingleColumnNullAwareAntiJoin` // Note that, We can not specify ruleId here since the LogicalQueryStage is not immutable. - _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION, TRUE_OR_FALSE_LITERAL)) { + _.containsAnyPattern( + LOGICAL_QUERY_STAGE, + LOCAL_RELATION, + EMPTY_RELATION, + TRUE_OR_FALSE_LITERAL)) { eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala index 1ef07bf9ebc15..50a09d9373c56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.ui -import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.{FormattedMode, SparkPlanInfo} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession class SparkPlanInfoSuite extends SharedSparkSession { @@ -41,4 +42,58 @@ class SparkPlanInfoSuite extends SharedSparkSession { validateSparkPlanInfo(planInfoResult) } + + test("SparkPlanInfo and plan graph include subtree under EmptyRelation") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + testData + val df = spark.sql("SELECT key FROM testData WHERE key = 0 ORDER BY key, value") + df.collect() + val planInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan) + + def findNamed(info: SparkPlanInfo, name: String): Option[SparkPlanInfo] = { + if (info.nodeName == name) Some(info) + else info.children.view.flatMap(findNamed(_, name)).headOption + } + val emptyInfo = findNamed(planInfo, "EmptyRelation") + assert(emptyInfo.isDefined, s"expected EmptyRelation in plan info: $planInfo") + assert( + emptyInfo.get.children.nonEmpty, + "EmptyRelation SparkPlanInfo should carry the preserved logical plan as children") + + val graph = SparkPlanGraph(planInfo) + val emptyNode = graph.allNodes.find(_.name == "EmptyRelation") + assert( + emptyNode.isDefined, + s"expected EmptyRelation graph node: ${graph.allNodes.map(_.name)}") + val childIds = graph.edges.collect { case e if e.toId == emptyNode.get.id => e.fromId } + assert( + childIds.nonEmpty, + "plan graph should have edges from nested nodes to EmptyRelation") + + val dot = graph.makeDotFile(Map.empty) + assert( + childIds.forall(cid => dot.contains(s"$cid->${emptyNode.get.id}")), + s"DOT should connect preserved-plan nodes to EmptyRelation: $dot") + } + } + + test("formatted explain matches SQL UI plan text: no (unknown) under EmptyRelationExec") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.UI_EXPLAIN_MODE.key -> "formatted") { + testData + val df = spark.sql("SELECT key FROM testData WHERE key = 0 ORDER BY key, value") + df.collect() + val explained = df.queryExecution.explainString(FormattedMode) + assert( + !explained.contains("unknown"), + s"Logical subtree under EmptyRelationExec must not use missing operator ids:\n$explained") + assert( + explained.contains("EmptyRelation"), + s"expected EmptyRelation in formatted physical plan:\n$explained") + } + } } From bcba4930d879b8febb8f576f951566f415bc33b2 Mon Sep 17 00:00:00 2001 From: ziqi liu Date: Wed, 25 Mar 2026 23:15:58 +0000 Subject: [PATCH 2/3] fix test --- .../tpch-plan-stability/q6/explain.txt | 22 ++++++++++--------- .../tpch-plan-stability/q6/simplified.txt | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 13 +++++++---- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/resources/tpch-plan-stability/q6/explain.txt b/sql/core/src/test/resources/tpch-plan-stability/q6/explain.txt index 54153c551bb01..5ada8681a3018 100644 --- a/sql/core/src/test/resources/tpch-plan-stability/q6/explain.txt +++ b/sql/core/src/test/resources/tpch-plan-stability/q6/explain.txt @@ -2,28 +2,30 @@ * HashAggregate (4) +- Exchange (3) +- * HashAggregate (2) - +- * LocalTableScan (1) + +- * EmptyRelation (1) + +- Project [l_extendedprice#1, l_discount#2] + +- LocalRelation , [l_orderkey#3, l_partkey#4, l_suppkey#5, l_linenumber#6, l_quantity#7, l_extendedprice#1, l_discount#2, l_tax#8, l_returnflag#9, l_linestatus#10, l_shipdate#11, l_commitdate#12, l_receiptdate#13, l_shipinstruct#14, l_shipmode#15, l_comment#16] -(1) LocalTableScan [codegen id : 1] +(1) EmptyRelation [codegen id : 1] Output [2]: [l_extendedprice#1, l_discount#2] -Arguments: , [l_extendedprice#1, l_discount#2] +Arguments: [plan_id=1] (2) HashAggregate [codegen id : 1] Input [2]: [l_extendedprice#1, l_discount#2] Keys: [] Functions [1]: [partial_sum((l_extendedprice#1 * l_discount#2))] -Aggregate Attributes [2]: [sum#3, isEmpty#4] -Results [2]: [sum#5, isEmpty#6] +Aggregate Attributes [2]: [sum#17, isEmpty#18] +Results [2]: [sum#19, isEmpty#20] (3) Exchange -Input [2]: [sum#5, isEmpty#6] -Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1] +Input [2]: [sum#19, isEmpty#20] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=2] (4) HashAggregate [codegen id : 2] -Input [2]: [sum#5, isEmpty#6] +Input [2]: [sum#19, isEmpty#20] Keys: [] Functions [1]: [sum((l_extendedprice#1 * l_discount#2))] -Aggregate Attributes [1]: [sum((l_extendedprice#1 * l_discount#2))#7] -Results [1]: [sum((l_extendedprice#1 * l_discount#2))#7 AS revenue#8] +Aggregate Attributes [1]: [sum((l_extendedprice#1 * l_discount#2))#21] +Results [1]: [sum((l_extendedprice#1 * l_discount#2))#21 AS revenue#22] diff --git a/sql/core/src/test/resources/tpch-plan-stability/q6/simplified.txt b/sql/core/src/test/resources/tpch-plan-stability/q6/simplified.txt index 5499f54fa5566..f3282990b8be6 100644 --- a/sql/core/src/test/resources/tpch-plan-stability/q6/simplified.txt +++ b/sql/core/src/test/resources/tpch-plan-stability/q6/simplified.txt @@ -4,4 +4,4 @@ WholeStageCodegen (2) Exchange #1 WholeStageCodegen (1) HashAggregate [l_extendedprice,l_discount] [sum,isEmpty,sum,isEmpty] - LocalTableScan [l_extendedprice,l_discount] + EmptyRelation [l_extendedprice,l_discount] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fdeed60e19292..68ab7cdb4be9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -35,7 +35,12 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LocalRelation, LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{ + EmptyRelation, + Filter, + LeafNode, + LogicalPlan, + OneRowRelation} import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, SortExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -2165,10 +2170,10 @@ class DataFrameSuite extends QueryTest val emptyDf = spark.emptyDataFrame.withColumn("id", lit(1L)) val joined = spark.range(10).join(emptyDf, "id") joined.queryExecution.optimizedPlan match { - case LocalRelation(Seq(id), Nil, _, _) => - assert(id.name == "id") + case e: EmptyRelation => + assert(e.output.map(_.name) == Seq("id")) case _ => - fail("emptyDataFrame should be foldable") + fail("emptyDataFrame should be foldable to EmptyRelation") } } From 1d2235b506a82e8f5621daa1d42e9a814aabfa5a Mon Sep 17 00:00:00 2001 From: ziqi liu Date: Thu, 26 Mar 2026 20:25:44 +0000 Subject: [PATCH 3/3] fix --- .../spark/sql/execution/UnionLoopExec.scala | 10 +++++++++- ...ullWithFalseInPredicateEndToEndSuite.scala | 5 +++-- .../metric/SQLMetricsTestUtils.scala | 19 +++++++++++++++++-- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala index c6d4091fc8092..c0de2d326f612 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ExpressionWithRandomSeed, InterpretedMutableProjection, Literal} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation.hasUnevaluableExpr import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LocalRelation, LogicalPlan, OneRowRelation, Project, Union, UnionLoopRef} +import org.apache.spark.sql.catalyst.plans.logical.{EmptyRelation, LocalLimit, LocalRelation, LogicalPlan, OneRowRelation, Project, Union, UnionLoopRef} import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.execution.LogicalRDD.rewriteStatsAndConstraints import org.apache.spark.sql.execution.metric.SQLMetrics @@ -116,6 +116,11 @@ case class UnionLoopExec( df.queryExecution.optimizedPlan match { case l: LocalRelation => (df, l.data.length.toLong) + case er: EmptyRelation => + // Avoid the default path (repartition + count + possibly collect), which adds overhead and + // registers an extra SQL execution via Dataset.collect(). + val empty = LocalRelation(er.output, Nil) + (Dataset.ofRows(session, empty), 0L) case Project(projectList, _: OneRowRelation) => if (localRelationLimit != 0 && !projectList.exists(hasUnevaluableExpr)) { val projection = new InterpretedMutableProjection(projectList, Nil) @@ -212,6 +217,9 @@ case class UnionLoopExec( case l: LocalRelation => prevPlan = l l.copy(output = r.output) + case er: EmptyRelation => + prevPlan = er + LocalRelation(r.output, Nil) // This case will be turned into a LocalRelation whenever the flag // SQLConf.CTE_RECURSION_ANCHOR_ROWS_LIMIT_TO_CONVERT_TO_LOCAL_RELATION is set to be // anything larger than 0. However, we still handle this case in a special way to diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala index 8883e9be1937e..a486894e05c1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ReplaceNullWithFalseInPredicateEndToEndSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{CaseWhen, If, Literal} -import org.apache.spark.sql.execution.LocalTableScanExec +import org.apache.spark.sql.execution.{EmptyRelationExec, LocalTableScanExec} import org.apache.spark.sql.functions.{lit, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -29,8 +29,9 @@ class ReplaceNullWithFalseInPredicateEndToEndSuite extends QueryTest with Shared private def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { + case _: EmptyRelationExec => case s: LocalTableScanExec => assert(s.rows.isEmpty) - case p => fail(s"$p is not LocalTableScanExec") + case p => fail(s"$p is not EmptyRelationExec or empty LocalTableScanExec") } test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 1df5f522b4b8d..249fe9e2c058d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.metric import java.io.File import scala.collection.mutable.HashMap +import scala.util.Try import org.apache.spark.TestUtils import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} @@ -174,8 +175,22 @@ trait SQLMetricsTestUtils extends SQLTestUtils { } sparkContext.listenerBus.waitUntilEmpty(10000) val executionIds = currentExecutionIds().diff(previousExecutionIds) - assert(executionIds.size === 1) - val executionId = executionIds.head + assert(executionIds.nonEmpty) + val executionId = if (executionIds.size == 1) { + executionIds.head + } else { + // More than one SQL execution can be registered for a single collect() (e.g. some optimizer + // paths). Prefer the execution whose stored plan graph contains all expected metric nodes. + val containing = executionIds.filter { id => + Try(statusStore.planGraph(id).allNodes.map(_.id).toSet).toOption.exists { graphIds => + expectedNodeIds.subsetOf(graphIds) + } + } + assert( + containing.nonEmpty, + s"No new execution among $executionIds contains metric nodes $expectedNodeIds") + containing.max + } val jobs = statusStore.execution(executionId).get.jobs // Use "<=" because there is a race condition that we may miss some jobs // TODO Change it to "=" once we fix the race condition that missing the JobStarted event.