Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, REPARTITION_OPERATION, TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.catalyst.trees.TreePattern.{EMPTY_RELATION, LOCAL_RELATION, REPARTITION_OPERATION, TRUE_OR_FALSE_LITERAL}

/**
* The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, REPARTIT
* Right side is non-empty and condition is empty. Eliminate join to its left side.
* - Left anti join
* Right side is non-empty and condition is empty. Eliminate join to an empty
* [[LocalRelation]].
* [[EmptyRelation]] so the underlying plan is preserved.
* 3. Unary-node Logical Plans
* - Project/Filter/Sample with all empty children.
* - Limit/Repartition/RepartitionByExpression/Rebalance with all empty children.
Expand All @@ -50,6 +50,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup

protected def isEmpty(plan: LogicalPlan): Boolean = plan match {
case p: LocalRelation => p.data.isEmpty
case _: EmptyRelation => true
case _ => false
}

Expand All @@ -59,7 +60,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
}

protected def empty(plan: LogicalPlan): LogicalPlan =
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)
EmptyRelation(plan)

// Construct a project list from plan's output, while the value is always NULL.
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
Expand Down Expand Up @@ -218,7 +219,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
*/
object PropagateEmptyRelation extends PropagateEmptyRelationBase {
override protected def applyInternal(p: LogicalPlan): LogicalPlan = p.transformUpWithPruning(
_.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
_.containsAnyPattern(LOCAL_RELATION, EMPTY_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
commonApplyFunc
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,53 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.trees.TreePattern.{EMPTY_RELATION, TreePattern}

case class EmptyRelation(logical: LogicalPlan) extends LeafNode {
override val nodePatterns: Seq[TreePattern] = Seq(EMPTY_RELATION)

override protected def stringArgs: Iterator[Any] = Iterator.empty

override def generateTreeString(
depth: Int,
lastChildren: java.util.ArrayList[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix,
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
// Nested logical operators are not registered in QueryPlan.localIdMap with physical ids.
Option(logical).foreach { _ =>
lastChildren.add(true)
logical.generateTreeString(
depth + 1,
lastChildren,
append,
verbose,
prefix = "",
addSuffix = false,
maxFields,
printNodeId = false,
printOutputColumns,
indent)
lastChildren.remove(lastChildren.size() - 1)
}
}

override val isStreaming: Boolean = logical.isStreaming

override val outputOrdering: Seq[SortOrder] = logical.outputOrdering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ object TreePattern extends Enumeration {
val DESERIALIZE_TO_OBJECT: Value = Value
val DF_DROP_COLUMNS: Value = Value
val DISTINCT_LIKE: Value = Value
val EMPTY_RELATION: Value = Value
val EVAL_PYTHON_UDF: Value = Value
val EVAL_PYTHON_UDTF: Value = Value
val EVENT_TIME_WATERMARK: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,32 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, EmptyRelation, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.IntegerType

// Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios
class OptimizeLimitZeroSuite extends PlanTest {

/** [[EmptyRelation]] or empty [[LocalRelation]] vs reference `expectedEmptyLocalRelation`. */
private def assertEmptyRelationOutput(
optimized: LogicalPlan,
expectedEmptyLocalRelation: LocalRelation): Unit = {
val expectedAttrs = expectedEmptyLocalRelation.output
optimized match {
case e: EmptyRelation =>
assert(
e.output.map(a => (a.name, a.dataType, a.nullable, a.metadata)) ==
expectedAttrs.map(a => (a.name, a.dataType, a.nullable, a.metadata)))
case lr: LocalRelation if lr.data.isEmpty =>
comparePlans(optimized, expectedEmptyLocalRelation)
case other =>
fail(
s"Expected EmptyRelation or empty LocalRelation, got ${other.getClass.getSimpleName}:\n" +
other)
}
}

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("OptimizeLimitZero", Once,
Expand All @@ -45,7 +65,7 @@ class OptimizeLimitZeroSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation($"a".int)

comparePlans(optimized, correctAnswer)
assertEmptyRelationOutput(optimized, correctAnswer)
}

test("Limit 0: individual LocalLimit 0 node") {
Expand All @@ -54,7 +74,7 @@ class OptimizeLimitZeroSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation($"a".int)

comparePlans(optimized, correctAnswer)
assertEmptyRelationOutput(optimized, correctAnswer)
}

test("Limit 0: individual GlobalLimit 0 node") {
Expand All @@ -63,25 +83,33 @@ class OptimizeLimitZeroSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation($"a".int)

comparePlans(optimized, correctAnswer)
assertEmptyRelationOutput(optimized, correctAnswer)
}

Seq(
(Inner, LocalRelation($"a".int, $"b".int)),
(LeftOuter, Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1)
.analyze),
(RightOuter, LocalRelation($"a".int, $"b".int)),
(FullOuter, Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1)
.analyze)
).foreach { case (jt, correctAnswer) =>
test(s"Limit 0: for join type $jt") {
val query = testRelation1
.join(testRelation2.limit(0), joinType = jt, condition = Some($"a".attr === $"b".attr))

val optimized = Optimize.execute(query.analyze)

(Inner, LocalRelation($"a".int, $"b".int), true),
(
LeftOuter,
Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze,
false),
(RightOuter, LocalRelation($"a".int, $"b".int), true),
(
FullOuter,
Project(Seq($"a", Literal(null).cast(IntegerType).as("b")), testRelation1).analyze,
false)
).foreach { case (jt, correctAnswer, expectEmptyRelationOrEmptyLocal) =>
test(s"Limit 0: for join type $jt") {
val query = testRelation1
.join(testRelation2.limit(0), joinType = jt, condition = Some($"a".attr === $"b".attr))

val optimized = Optimize.execute(query.analyze)

if (expectEmptyRelationOrEmptyLocal) {
assertEmptyRelationOutput(optimized, correctAnswer.asInstanceOf[LocalRelation])
} else {
comparePlans(optimized, correctAnswer)
}
}
}

test("Limit 0: 3-way join") {
Expand All @@ -95,16 +123,22 @@ class OptimizeLimitZeroSuite extends PlanTest {
val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation($"a".int, $"b".int, $"c".int)

comparePlans(optimized, correctAnswer)
assertEmptyRelationOutput(optimized, correctAnswer)
}

test("Limit 0: intersect") {
val query = testRelation1
.intersect(testRelation1.limit(0), isAll = false)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = Distinct(LocalRelation($"a".int))

comparePlans(optimized, correctAnswer)
optimized match {
case Distinct(child: EmptyRelation) =>
val correctAnswer = LocalRelation($"a".int)

assertEmptyRelationOutput(child, correctAnswer)
case other =>
fail(s"Expected Distinct(EmptyRelation), got ${other.getClass.getSimpleName}:\n$other")
}
}
}
Loading