diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3c210ca7d985b..1b6fbd2b7c780 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5714,6 +5714,18 @@ ], "sqlState" : "38000" }, + "QUALIFY_AGGREGATE_NOT_ALLOWED" : { + "message" : [ + "Aggregate functions are not supported in QUALIFY: ." + ], + "sqlState" : "42903" + }, + "QUALIFY_REQUIRES_WINDOW_FUNCTION" : { + "message" : [ + "The QUALIFY clause requires at least one window function in the current SELECT list or the QUALIFY condition." + ], + "sqlState" : "42903" + }, "RECURSION_LEVEL_LIMIT_EXCEEDED" : { "message" : [ "Recursion level limit reached but query has not exhausted, try increasing it like 'WITH RECURSIVE t(col) MAX RECURSION LEVEL 200'." diff --git a/docs/sql-ref-syntax-qry-select-qualify.md b/docs/sql-ref-syntax-qry-select-qualify.md new file mode 100644 index 0000000000000..ca4955b6e4321 --- /dev/null +++ b/docs/sql-ref-syntax-qry-select-qualify.md @@ -0,0 +1,98 @@ +--- +layout: global +title: QUALIFY Clause +displayTitle: QUALIFY Clause +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +The `QUALIFY` clause filters rows after window functions have been evaluated. +It can refer to window functions in the `SELECT` list by alias, or define window +functions directly in the `QUALIFY` condition. + +### Syntax + +```sql +QUALIFY boolean_expression +``` + +### Parameters + +* **boolean_expression** + + Specifies any expression that evaluates to a result type `boolean`. Two or + more expressions may be combined together using the logical + operators ( `AND`, `OR` ). + + **Note** + + The current query's `SELECT` list or the `QUALIFY` condition must contain at least + one window function. Aggregate functions are not allowed in the `QUALIFY` condition. + +### Examples + +```sql +CREATE TABLE dealer (id INT, city STRING, car_model STRING, quantity INT); +INSERT INTO dealer VALUES + (100, 'Fremont', 'Honda Civic', 10), + (100, 'Fremont', 'Honda Accord', 15), + (100, 'Fremont', 'Honda CRV', 7), + (200, 'Dublin', 'Honda Civic', 20), + (200, 'Dublin', 'Honda Accord', 10), + (200, 'Dublin', 'Honda CRV', 3), + (300, 'San Jose', 'Honda Civic', 5), + (300, 'San Jose', 'Honda Accord', 8); + +-- `QUALIFY` clause referring to a window function in the `SELECT` list by alias. +SELECT city, car_model, RANK() OVER (PARTITION BY car_model ORDER BY quantity) AS rank +FROM dealer +QUALIFY rank = 1; ++--------+------------+----+ +| city| car_model|rank| ++--------+------------+----+ +|San Jose|Honda Accord| 1| +| Dublin| Honda CRV| 1| +|San Jose| Honda Civic| 1| ++--------+------------+----+ + +-- `QUALIFY` clause with a window function directly in the predicate. +SELECT city, car_model +FROM dealer +QUALIFY RANK() OVER (PARTITION BY car_model ORDER BY quantity) = 1; ++--------+------------+ +| city| car_model| ++--------+------------+ +|San Jose|Honda Accord| +| Dublin| Honda CRV| +|San Jose| Honda Civic| ++--------+------------+ +``` + +### Related Statements + +* [SELECT Main](sql-ref-syntax-qry-select.html) +* [WHERE Clause](sql-ref-syntax-qry-select-where.html) +* [GROUP BY Clause](sql-ref-syntax-qry-select-groupby.html) +* [HAVING Clause](sql-ref-syntax-qry-select-having.html) +* [WINDOW Clause](sql-ref-syntax-qry-select-window.html) +* [ORDER BY Clause](sql-ref-syntax-qry-select-orderby.html) +* [SORT BY Clause](sql-ref-syntax-qry-select-sortby.html) +* [CLUSTER BY Clause](sql-ref-syntax-qry-select-clusterby.html) +* [DISTRIBUTE BY Clause](sql-ref-syntax-qry-select-distribute-by.html) +* [LIMIT Clause](sql-ref-syntax-qry-select-limit.html) +* [OFFSET Clause](sql-ref-syntax-qry-select-offset.html) diff --git a/docs/sql-ref-syntax-qry-select.md b/docs/sql-ref-syntax-qry-select.md index 1d5532898c654..3a0fb60f30130 100644 --- a/docs/sql-ref-syntax-qry-select.md +++ b/docs/sql-ref-syntax-qry-select.md @@ -49,6 +49,8 @@ SELECT [ hints , ... ] [ ALL | DISTINCT ] { [ [ named_expression | regex_column_ [ WHERE boolean_expression ] [ GROUP BY expression [ , ... ] ] [ HAVING boolean_expression ] + [ WINDOW clause ] + [ QUALIFY boolean_expression ] ``` ### Parameters @@ -122,6 +124,12 @@ SELECT [ hints , ... ] [ ALL | DISTINCT ] { [ [ named_expression | regex_column_ filter rows after the grouping is performed. If HAVING is specified without GROUP BY, it indicates a GROUP BY without grouping expressions (global aggregate). +* **QUALIFY** + + Filters rows after window functions have been evaluated. The current `SELECT` list or the + `QUALIFY` condition must contain at least one window function, and aggregate functions are + not allowed inside the `QUALIFY` condition. + * **ORDER BY** Specifies an ordering of the rows of the complete result set of the query. The output rows are ordered diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 34de788c6d464..b0008525ea8e0 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -399,6 +399,7 @@ PROCEDURE: 'PROCEDURE'; PROCEDURES: 'PROCEDURES'; PROPERTIES: 'PROPERTIES'; PURGE: 'PURGE'; +QUALIFY: 'QUALIFY'; QUARTER: 'QUARTER'; QUERY: 'QUERY'; RANGE: 'RANGE'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index a1de1234ef317..549726cf5b6c5 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -795,6 +795,7 @@ fromStatementBody aggregationClause? havingClause? windowClause? + qualifyClause? queryOrganization ; @@ -812,7 +813,8 @@ querySpecification whereClause? aggregationClause? havingClause? - windowClause? #regularQuerySpecification + windowClause? + qualifyClause? #regularQuerySpecification ; transformClause @@ -883,6 +885,10 @@ havingClause : HAVING booleanExpression ; +qualifyClause + : QUALIFY booleanExpression + ; + hint : HENT_START hintStatements+=hintStatement (COMMA? hintStatements+=hintStatement)* HENT_END ; @@ -2118,6 +2124,7 @@ ansiNonReserved | PROCEDURES | PROPERTIES | PURGE + | QUALIFY | QUARTER | QUERY | RANGE @@ -2530,6 +2537,7 @@ nonReserved | PROCEDURES | PROPERTIES | PURGE + | QUALIFY | QUARTER | QUERY | RANGE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 504227db444f6..dcc0c83e8c8aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -488,6 +488,7 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: + ResolveQualifyExpressions :: TimeWindowing :: SessionWindowing :: ResolveWindowTime :: @@ -1653,6 +1654,20 @@ class Analyzer( resolveExpressionByPlanChildren(resolvedWithAgg, u, includeLastResort = true) } + case Filter(QualifyExpression(cond, _), _) + if invalidQualifyAggregateFunction(cond).isDefined => + throw QueryCompilationErrors.aggregateInQualifyNotAllowedError( + invalidQualifyAggregateFunction(cond).get) + + // QUALIFY can host grouping expressions/aggregate functions when it is attached to an + // Aggregate. Resolve columns with `agg.child.output` first, similar to HAVING. + case f @ Filter(QualifyExpression(cond, selectListHasWindowFunction), agg: Aggregate) + if !cond.resolved => + val resolvedWithAgg = resolveColWithAgg(cond, agg) + val resolvedCond = + resolveExpressionByPlanChildren(resolvedWithAgg, f, includeLastResort = true) + f.copy(condition = QualifyExpression(resolvedCond, selectListHasWindowFunction)) + // RepartitionByExpression can host missing attributes that are from a descendant node. // For example, `spark.table("t").select($"a").repartition($"b")`. We can resolve `b` with // table `t` even if there is a Project node between the table scan node and Sort node. @@ -1701,6 +1716,22 @@ class Analyzer( q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true)) } + private def isUnresolvedAggregateFunction(unresolvedFunc: UnresolvedFunction): Boolean = { + Try(functionResolution.resolveFunction(unresolvedFunc)) match { + case Success(_: AggregateExpression | _: AggregateFunction) => true + case _ => false + } + } + + private def invalidQualifyAggregateFunction(expr: Expression): Option[Expression] = expr match { + case _: WindowExpression | _: UnresolvedWindowExpression => None + case unresolvedFunc: UnresolvedFunction if isUnresolvedAggregateFunction(unresolvedFunc) => + Some(unresolvedFunc) + case agg: AggregateExpression => Some(agg) + case agg: AggregateFunction => Some(agg) + case other => other.children.to(LazyList).flatMap(invalidQualifyAggregateFunction).headOption + } + private object MergeResolvePolicy extends Enumeration { val BOTH, SOURCE, TARGET = Value } @@ -2833,8 +2864,17 @@ class Analyzer( * * We need to make sure the expressions all fully resolved before looking for aggregate functions * and group by expressions from them. - */ + */ object ResolveAggregateFunctions extends Rule[LogicalPlan] { + private def invalidResolvedQualifyAggregateFunction(expr: Expression): Option[Expression] = + expr match { + case _: WindowExpression | _: UnresolvedWindowExpression => None + case agg: AggregateExpression => Some(agg) + case agg: AggregateFunction => Some(agg) + case other => + other.children.to(LazyList).flatMap(invalidResolvedQualifyAggregateFunction).headOption + } + def apply(plan: LogicalPlan): LogicalPlan = { val collatedPlan = if (conf.getConf(SQLConf.RUN_COLLATION_TYPE_CASTS_BEFORE_ALIAS_ASSIGNMENT)) { @@ -2862,6 +2902,15 @@ class Analyzer( } }) + case Filter(QualifyExpression(cond, selectListHasWindowFunction), agg: Aggregate) + if agg.resolved && cond.resolved => + invalidResolvedQualifyAggregateFunction(cond).foreach { agg => + throw QueryCompilationErrors.aggregateInQualifyNotAllowedError(agg) + } + resolveOperatorWithAggregate(Seq(cond), agg, (newExprs, newChild) => { + Filter(QualifyExpression(newExprs.head, selectListHasWindowFunction), newChild) + }) + case Filter(cond, agg: Aggregate) if agg.resolved && cond.resolved => resolveOperatorWithAggregate(Seq(cond), agg, (newExprs, newChild) => { Filter(newExprs.head, newChild) @@ -3226,6 +3275,21 @@ class Analyzer( } } + private def invalidResolvedQualifyAggregateFunction(expr: Expression): Option[Expression] = + expr match { + case _: WindowExpression | _: UnresolvedWindowExpression => None + case agg: AggregateExpression => Some(agg) + case agg: AggregateFunction => Some(agg) + case other => + other.children.to(LazyList).flatMap(invalidResolvedQualifyAggregateFunction).headOption + } + + private def validateQualifyAggregateCondition(condition: Expression): Unit = { + invalidResolvedQualifyAggregateFunction(condition).foreach { agg => + throw QueryCompilationErrors.aggregateInQualifyNotAllowedError(agg) + } + } + /** * From a Seq of [[NamedExpression]]s, extract expressions containing window expressions and * other regular expressions that do not contain any window expression. For example, for @@ -3404,11 +3468,108 @@ class Analyzer( Project(windowOps.output ++ newExpressionsWithWindowFunctions, windowOps) } // end of addWindow + private def addQualifyFilter( + projectList: Seq[NamedExpression], + havingCondition: Expression, + qualifyCondition: Expression, + child: LogicalPlan): LogicalPlan = { + val qualifyAlias = Alias(qualifyCondition, "__qualify_cond")() + val (windowExpressions, regularExpressions) = extract(projectList :+ qualifyAlias) + val withProject = Project(regularExpressions, child) + val extractedFilter = Filter(havingCondition, withProject) + val withWindow = addWindow(windowExpressions, extractedFilter) + Project(projectList.map(_.toAttribute), Filter(qualifyAlias.toAttribute, withWindow)) + } + + private def addQualifyFilter( + projectList: Seq[NamedExpression], + qualifyCondition: Expression, + child: LogicalPlan): LogicalPlan = { + val qualifyAlias = Alias(qualifyCondition, "__qualify_cond")() + val (windowExpressions, regularExpressions) = extract(projectList :+ qualifyAlias) + val withProject = Project(regularExpressions, child) + val withWindow = addWindow(windowExpressions, withProject) + Project(projectList.map(_.toAttribute), Filter(qualifyAlias.toAttribute, withWindow)) + } + + private def addQualifyFilter( + groupingExprs: Seq[Expression], + aggregateExprs: Seq[NamedExpression], + qualifyCondition: Expression, + child: LogicalPlan): LogicalPlan = { + val withAggregate = Aggregate(groupingExprs, aggregateExprs, child) + validateQualifyAggregateCondition(qualifyCondition) + + val qualifyAlias = Alias(qualifyCondition, "__qualify_cond")() + val (windowExpressions, extractedAggregateExprs) = extract(aggregateExprs :+ qualifyAlias) + val extractedAggregate = Aggregate(groupingExprs, extractedAggregateExprs, child) + val withWindow = addWindow(windowExpressions, extractedAggregate) + Project(aggregateExprs.map(_.toAttribute), Filter(qualifyAlias.toAttribute, withWindow)) + } + + private def addQualifyFilter( + groupingExprs: Seq[Expression], + aggregateExprs: Seq[NamedExpression], + havingCondition: Expression, + qualifyCondition: Expression, + child: LogicalPlan): LogicalPlan = { + val withAggregate = Aggregate(groupingExprs, aggregateExprs, child) + val withFilter = Filter(havingCondition, withAggregate) + validateQualifyAggregateCondition(qualifyCondition) + + val qualifyAlias = Alias(qualifyCondition, "__qualify_cond")() + val (windowExpressions, extractedAggregateExprs) = extract(aggregateExprs :+ qualifyAlias) + val extractedAggregate = Aggregate(groupingExprs, extractedAggregateExprs, child) + val extractedFilter = Filter(havingCondition, extractedAggregate) + val withWindow = addWindow(windowExpressions, extractedFilter) + Project(aggregateExprs.map(_.toAttribute), Filter(qualifyAlias.toAttribute, withWindow)) + } + // We have to use transformDown at here to make sure the rule of // "Aggregate with Having clause" will be triggered. def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning( _.containsPattern(WINDOW_EXPRESSION), ruleId) { + case f @ Filter(QualifyExpression(condition, _), a @ Aggregate(groupingExprs, aggregateExprs, + child, _)) + if hasWindowFunction(condition) && + condition.resolved && + child.resolved && + a.expressions.forall(_.resolved) && + !aggregateExprs.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => + addQualifyFilter(groupingExprs, aggregateExprs, condition, child) + + case f @ Filter(QualifyExpression(condition, _), + Filter(havingCondition, a @ Aggregate(groupingExprs, aggregateExprs, child, _))) + if hasWindowFunction(condition) && + condition.resolved && + child.resolved && + havingCondition.resolved && + a.expressions.forall(_.resolved) && + !aggregateExprs.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => + addQualifyFilter(groupingExprs, aggregateExprs, havingCondition, condition, child) + + case f @ Filter(QualifyExpression(condition, _), p @ Project(projectList, child)) + if hasWindowFunction(condition) && + condition.resolved && + child.resolved && + p.expressions.forall(_.resolved) && + !projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => + addQualifyFilter(projectList, condition, child) + + case f @ Filter(QualifyExpression(condition, _), Filter(havingCondition, + p @ Project(projectList, child))) + if hasWindowFunction(condition) && + condition.resolved && + child.resolved && + havingCondition.resolved && + p.expressions.forall(_.resolved) && + !projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => + addQualifyFilter(projectList, havingCondition, condition, child) + + case f @ Filter(QualifyExpression(_, _), _) => + f + case Filter(condition, _) if hasWindowFunction(condition) => throw QueryCompilationErrors.windowFunctionNotAllowedError("WHERE") @@ -4249,3 +4410,32 @@ object ResolveUnresolvedHaving extends Rule[LogicalPlan] { } } } + +/** + * Rewrites QUALIFY predicates into regular filters once the current query's window expressions + * have been materialized. + */ +object ResolveQualifyExpressions extends Rule[LogicalPlan] { + private def invalidResolvedQualifyAggregateFunction(expr: Expression): Option[Expression] = + expr match { + case _: WindowExpression | _: UnresolvedWindowExpression => None + case agg: AggregateExpression => Some(agg) + case agg: AggregateFunction => Some(agg) + case other => + other.children.to(LazyList).flatMap(invalidResolvedQualifyAggregateFunction).headOption + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(_.containsPattern(FILTER), ruleId) { + case Filter(QualifyExpression(condition, selectListHasWindowFunction), child) + if condition.resolved && child.resolved => + invalidResolvedQualifyAggregateFunction(condition).foreach { agg => + throw QueryCompilationErrors.aggregateInQualifyNotAllowedError(agg) + } + if (!selectListHasWindowFunction) { + throw QueryCompilationErrors.qualifyRequiresWindowFunctionError() + } + Filter(condition, child) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 622b0c319f991..58e9bf5f49bdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -1135,6 +1135,20 @@ case class UnresolvedHaving( final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_HAVING) } +/** + * Marks a filter predicate originating from a QUALIFY clause until window extraction finishes. + */ +case class QualifyExpression( + child: Expression, + selectListHasWindowFunction: Boolean) + extends UnaryExpression with Unevaluable { + override def dataType: DataType = child.dataType + override def nullable: Boolean = child.nullable + override lazy val resolved: Boolean = child.resolved + override protected def withNewChildInternal(newChild: Expression): QualifyExpression = + copy(child = newChild) +} + /** * A place holder expression used in random functions, will be replaced after analyze. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f82429bae7148..aa82460c255fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -785,6 +785,7 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, + ctx.qualifyClause, plan, isPipeOperatorSelect = false ) @@ -1444,6 +1445,7 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, + ctx.qualifyClause, from, isPipeOperatorSelect = false ) @@ -1492,6 +1494,16 @@ class AstBuilder extends DataTypeAstBuilder Filter(expression(ctx.booleanExpression), plan) } + /** + * Create a logical plan using a qualify clause. + */ + private def withQualifyClause( + ctx: QualifyClauseContext, + selectListHasWindowFunction: Boolean, + plan: LogicalPlan): LogicalPlan = { + Filter(QualifyExpression(expression(ctx.booleanExpression), selectListHasWindowFunction), plan) + } + /** * Add a hive-style transform (SELECT TRANSFORM/MAP/REDUCE) query specification to a logical plan. */ @@ -1532,6 +1544,7 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, + qualifyClause = null, isDistinct = false, isPipeOperatorSelect = false) @@ -1567,6 +1580,7 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, + qualifyClause: QualifyClauseContext, relation: LogicalPlan, isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) { val isDistinct = selectClause.setQuantifier() != null && @@ -1580,6 +1594,7 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, + qualifyClause, isDistinct, isPipeOperatorSelect) @@ -1595,8 +1610,14 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, + qualifyClause: QualifyClauseContext, isDistinct: Boolean, isPipeOperatorSelect: Boolean): LogicalPlan = { + def hasWindowFunction(expr: Expression): Boolean = expr.exists { + case _: WindowExpression | _: UnresolvedWindowExpression => true + case _ => false + } + // Add lateral views. val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate) @@ -1608,6 +1629,7 @@ class AstBuilder extends DataTypeAstBuilder case (e: NamedExpression, _) => e case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc) } + val selectListHasWindowFunction = namedExpressions.exists(hasWindowFunction) def createProject() = if (namedExpressions.nonEmpty) { val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) { @@ -1655,11 +1677,15 @@ class AstBuilder extends DataTypeAstBuilder createProject() } + val withQualify = withProject.optionalMap(qualifyClause) { + (ctx, plan) => withQualifyClause(ctx, selectListHasWindowFunction, plan) + } + // Distinct val withDistinct = if (isDistinct) { - Distinct(withProject) + Distinct(withQualify) } else { - withProject + withQualify } // Window @@ -6995,6 +7021,7 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause = ctx.aggregationClause, havingClause = null, windowClause = ctx.windowClause, + qualifyClause = null, relation = left, isPipeOperatorSelect = true) }.getOrElse(Option(ctx.EXTEND).map { _ => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 1e718c02f5ea5..777915b3ebbec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -63,6 +63,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveProcedures" :: + "org.apache.spark.sql.catalyst.analysis.ResolveQualifyExpressions" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRandomSeed" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences" :: "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 89b574fa61fcc..61a0b92308d27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -863,6 +863,19 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("clauseName" -> clauseName)) } + def qualifyRequiresWindowFunctionError(): Throwable = { + new AnalysisException( + errorClass = "QUALIFY_REQUIRES_WINDOW_FUNCTION", + messageParameters = Map.empty) + } + + def aggregateInQualifyNotAllowedError(aggregateExpr: Expression): Throwable = { + new AnalysisException( + errorClass = "QUALIFY_AGGREGATE_NOT_ALLOWED", + messageParameters = Map("aggregateExpr" -> toSQLExpr(aggregateExpr)), + origin = aggregateExpr.origin) + } + def cannotSpecifyWindowFrameError(prettyName: String): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1035", diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/qualify.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/qualify.sql.out new file mode 100644 index 0000000000000..1403398260b2d --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/qualify.sql.out @@ -0,0 +1,188 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE OR REPLACE TEMPORARY VIEW dealer AS SELECT * FROM VALUES + (100, 'Fremont', 'Honda Civic', 10), + (100, 'Fremont', 'Honda Accord', 15), + (100, 'Fremont', 'Honda CRV', 7), + (200, 'Dublin', 'Honda Civic', 20), + (200, 'Dublin', 'Honda Accord', 10), + (200, 'Dublin', 'Honda CRV', 3), + (300, 'San Jose', 'Honda Civic', 5), + (300, 'San Jose', 'Honda Accord', 8) +AS dealer(id, city, car_model, quantity) +-- !query analysis +CreateViewCommand `dealer`, SELECT * FROM VALUES + (100, 'Fremont', 'Honda Civic', 10), + (100, 'Fremont', 'Honda Accord', 15), + (100, 'Fremont', 'Honda CRV', 7), + (200, 'Dublin', 'Honda Civic', 20), + (200, 'Dublin', 'Honda Accord', 10), + (200, 'Dublin', 'Honda CRV', 3), + (300, 'San Jose', 'Honda Civic', 5), + (300, 'San Jose', 'Honda Accord', 8) +AS dealer(id, city, car_model, quantity), false, true, LocalTempView, UNSUPPORTED, true + +- Project [id#x, city#x, car_model#x, quantity#x] + +- SubqueryAlias dealer + +- LocalRelation [id#x, city#x, car_model#x, quantity#x] + + +-- !query +SELECT city, car_model, RANK() OVER (PARTITION BY car_model ORDER BY quantity) AS rank +FROM dealer +QUALIFY rank = 1 +ORDER BY car_model, city +-- !query analysis +Sort [car_model#x ASC NULLS FIRST, city#x ASC NULLS FIRST], true ++- Filter (rank#x = 1) + +- Project [city#x, car_model#x, rank#x] + +- Project [city#x, car_model#x, quantity#x, rank#x, rank#x] + +- Window [rank(quantity#x) windowspecdefinition(car_model#x, quantity#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#x], [car_model#x], [quantity#x ASC NULLS FIRST] + +- Project [city#x, car_model#x, quantity#x] + +- SubqueryAlias dealer + +- View (`dealer`, [id#x, city#x, car_model#x, quantity#x]) + +- Project [cast(id#x as int) AS id#x, cast(city#x as string) AS city#x, cast(car_model#x as string) AS car_model#x, cast(quantity#x as int) AS quantity#x] + +- Project [id#x, city#x, car_model#x, quantity#x] + +- SubqueryAlias dealer + +- LocalRelation [id#x, city#x, car_model#x, quantity#x] + + +-- !query +SELECT city, car_model +FROM dealer +QUALIFY RANK() OVER (PARTITION BY car_model ORDER BY quantity) = 1 +ORDER BY car_model, city +-- !query analysis +Sort [car_model#x ASC NULLS FIRST, city#x ASC NULLS FIRST], true ++- Project [city#x, car_model#x] + +- Project [city#x, car_model#x, quantity#x] + +- Filter __qualify_cond#x: boolean + +- Project [city#x, car_model#x, quantity#x, _we0#x, (_we0#x = 1) AS __qualify_cond#x] + +- Window [rank(quantity#x) windowspecdefinition(car_model#x, quantity#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#x], [car_model#x], [quantity#x ASC NULLS FIRST] + +- Project [city#x, car_model#x, quantity#x] + +- SubqueryAlias dealer + +- View (`dealer`, [id#x, city#x, car_model#x, quantity#x]) + +- Project [cast(id#x as int) AS id#x, cast(city#x as string) AS city#x, cast(car_model#x as string) AS car_model#x, cast(quantity#x as int) AS quantity#x] + +- Project [id#x, city#x, car_model#x, quantity#x] + +- SubqueryAlias dealer + +- LocalRelation [id#x, city#x, car_model#x, quantity#x] + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW testData2 AS SELECT * FROM VALUES + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 3) +AS testData2(a, b) +-- !query analysis +CreateViewCommand `testData2`, SELECT * FROM VALUES + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 3) +AS testData2(a, b), false, true, LocalTempView, UNSUPPORTED, true + +- Project [a#x, b#x] + +- SubqueryAlias testData2 + +- LocalRelation [a#x, b#x] + + +-- !query +SELECT a, SUM(b) AS total +FROM testData2 +GROUP BY a +HAVING SUM(b) > 2 +QUALIFY ROW_NUMBER() OVER (ORDER BY a DESC) = 1 +-- !query analysis +Project [a#x, total#xL] ++- Filter __qualify_cond#x: boolean + +- Project [a#x, total#xL, _we0#x, (_we0#x = 1) AS __qualify_cond#x] + +- Window [row_number() windowspecdefinition(a#x DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#x], [a#x DESC NULLS LAST] + +- Filter (total#xL > cast(2 as bigint)) + +- Aggregate [a#x], [a#x, sum(b#x) AS total#xL] + +- SubqueryAlias testdata2 + +- View (`testData2`, [a#x, b#x]) + +- Project [cast(a#x as int) AS a#x, cast(b#x as int) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias testData2 + +- LocalRelation [a#x, b#x] + + +-- !query +SELECT a, SUM(b) AS total, ROW_NUMBER() OVER (ORDER BY a) AS rn +FROM testData2 +GROUP BY a +QUALIFY total > 1 +ORDER BY a +-- !query analysis +Sort [a#x ASC NULLS FIRST], true ++- Filter (total#xL > cast(1 as bigint)) + +- Project [a#x, total#xL, rn#x] + +- Project [a#x, total#xL, rn#x, rn#x] + +- Window [row_number() windowspecdefinition(a#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#x], [a#x ASC NULLS FIRST] + +- Aggregate [a#x], [a#x, sum(b#x) AS total#xL] + +- SubqueryAlias testdata2 + +- View (`testData2`, [a#x, b#x]) + +- Project [cast(a#x as int) AS a#x, cast(b#x as int) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias testData2 + +- LocalRelation [a#x, b#x] + + +-- !query +SELECT a, total +FROM ( + SELECT a, SUM(b) AS total + FROM testData2 + GROUP BY a +) t +QUALIFY ROW_NUMBER() OVER (ORDER BY a) = 1 AND total > 1 +-- !query analysis +Project [a#x, total#xL] ++- Filter __qualify_cond#x: boolean + +- Project [a#x, total#xL, _we0#x, ((_we0#x = 1) AND (total#xL > cast(1 as bigint))) AS __qualify_cond#x] + +- Window [row_number() windowspecdefinition(a#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#x], [a#x ASC NULLS FIRST] + +- Project [a#x, total#xL] + +- SubqueryAlias t + +- Aggregate [a#x], [a#x, sum(b#x) AS total#xL] + +- SubqueryAlias testdata2 + +- View (`testData2`, [a#x, b#x]) + +- Project [cast(a#x as int) AS a#x, cast(b#x as int) AS b#x] + +- Project [a#x, b#x] + +- SubqueryAlias testData2 + +- LocalRelation [a#x, b#x] + + +-- !query +SELECT a +FROM testData2 +QUALIFY a = 1 +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "QUALIFY_REQUIRES_WINDOW_FUNCTION", + "sqlState" : "42903" +} + + +-- !query +SELECT a, RANK() OVER (ORDER BY b) AS rank +FROM testData2 +QUALIFY COUNT(1) > 1 +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "QUALIFY_AGGREGATE_NOT_ALLOWED", + "sqlState" : "42903", + "messageParameters" : { + "aggregateExpr" : "\"count(1)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 67, + "stopIndex" : 74, + "fragment" : "COUNT(1)" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/qualify.sql b/sql/core/src/test/resources/sql-tests/inputs/qualify.sql new file mode 100644 index 0000000000000..28dc782733968 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/qualify.sql @@ -0,0 +1,56 @@ +CREATE OR REPLACE TEMPORARY VIEW dealer AS SELECT * FROM VALUES + (100, 'Fremont', 'Honda Civic', 10), + (100, 'Fremont', 'Honda Accord', 15), + (100, 'Fremont', 'Honda CRV', 7), + (200, 'Dublin', 'Honda Civic', 20), + (200, 'Dublin', 'Honda Accord', 10), + (200, 'Dublin', 'Honda CRV', 3), + (300, 'San Jose', 'Honda Civic', 5), + (300, 'San Jose', 'Honda Accord', 8) +AS dealer(id, city, car_model, quantity); + +SELECT city, car_model, RANK() OVER (PARTITION BY car_model ORDER BY quantity) AS rank +FROM dealer +QUALIFY rank = 1 +ORDER BY car_model, city; + +SELECT city, car_model +FROM dealer +QUALIFY RANK() OVER (PARTITION BY car_model ORDER BY quantity) = 1 +ORDER BY car_model, city; + +CREATE OR REPLACE TEMPORARY VIEW testData2 AS SELECT * FROM VALUES + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 3) +AS testData2(a, b); + +SELECT a, SUM(b) AS total +FROM testData2 +GROUP BY a +HAVING SUM(b) > 2 +QUALIFY ROW_NUMBER() OVER (ORDER BY a DESC) = 1; + +SELECT a, SUM(b) AS total, ROW_NUMBER() OVER (ORDER BY a) AS rn +FROM testData2 +GROUP BY a +QUALIFY total > 1 +ORDER BY a; + +SELECT a, total +FROM ( + SELECT a, SUM(b) AS total + FROM testData2 + GROUP BY a +) t +QUALIFY ROW_NUMBER() OVER (ORDER BY a) = 1 AND total > 1; + +SELECT a +FROM testData2 +QUALIFY a = 1; + +SELECT a, RANK() OVER (ORDER BY b) AS rank +FROM testData2 +QUALIFY COUNT(1) > 1; diff --git a/sql/core/src/test/resources/sql-tests/results/qualify.sql.out b/sql/core/src/test/resources/sql-tests/results/qualify.sql.out new file mode 100644 index 0000000000000..5c34763953a19 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/qualify.sql.out @@ -0,0 +1,134 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE OR REPLACE TEMPORARY VIEW dealer AS SELECT * FROM VALUES + (100, 'Fremont', 'Honda Civic', 10), + (100, 'Fremont', 'Honda Accord', 15), + (100, 'Fremont', 'Honda CRV', 7), + (200, 'Dublin', 'Honda Civic', 20), + (200, 'Dublin', 'Honda Accord', 10), + (200, 'Dublin', 'Honda CRV', 3), + (300, 'San Jose', 'Honda Civic', 5), + (300, 'San Jose', 'Honda Accord', 8) +AS dealer(id, city, car_model, quantity) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT city, car_model, RANK() OVER (PARTITION BY car_model ORDER BY quantity) AS rank +FROM dealer +QUALIFY rank = 1 +ORDER BY car_model, city +-- !query schema +struct +-- !query output +San Jose Honda Accord 1 +Dublin Honda CRV 1 +San Jose Honda Civic 1 + + +-- !query +SELECT city, car_model +FROM dealer +QUALIFY RANK() OVER (PARTITION BY car_model ORDER BY quantity) = 1 +ORDER BY car_model, city +-- !query schema +struct +-- !query output +San Jose Honda Accord +Dublin Honda CRV +San Jose Honda Civic + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW testData2 AS SELECT * FROM VALUES + (1, 1), + (1, 2), + (2, 1), + (2, 2), + (3, 3) +AS testData2(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT a, SUM(b) AS total +FROM testData2 +GROUP BY a +HAVING SUM(b) > 2 +QUALIFY ROW_NUMBER() OVER (ORDER BY a DESC) = 1 +-- !query schema +struct +-- !query output +3 3 + + +-- !query +SELECT a, SUM(b) AS total, ROW_NUMBER() OVER (ORDER BY a) AS rn +FROM testData2 +GROUP BY a +QUALIFY total > 1 +ORDER BY a +-- !query schema +struct +-- !query output +1 3 1 +2 3 2 +3 3 3 + + +-- !query +SELECT a, total +FROM ( + SELECT a, SUM(b) AS total + FROM testData2 + GROUP BY a +) t +QUALIFY ROW_NUMBER() OVER (ORDER BY a) = 1 AND total > 1 +-- !query schema +struct +-- !query output +1 3 + + +-- !query +SELECT a +FROM testData2 +QUALIFY a = 1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "QUALIFY_REQUIRES_WINDOW_FUNCTION", + "sqlState" : "42903" +} + + +-- !query +SELECT a, RANK() OVER (ORDER BY b) AS rank +FROM testData2 +QUALIFY COUNT(1) > 1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "QUALIFY_AGGREGATE_NOT_ALLOWED", + "sqlState" : "42903", + "messageParameters" : { + "aggregateExpr" : "\"count(1)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 67, + "stopIndex" : 74, + "fragment" : "COUNT(1)" + } ] +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 6e9f338557158..4fec8b354018b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1053,6 +1053,185 @@ class DataFrameWindowFunctionsSuite extends QueryTest "HAVING") } + test("QUALIFY filters window function results") { + withTempView("dealer") { + Seq( + (100, "Fremont", "Honda Civic", 10), + (100, "Fremont", "Honda Accord", 15), + (100, "Fremont", "Honda CRV", 7), + (200, "Dublin", "Honda Civic", 20), + (200, "Dublin", "Honda Accord", 10), + (200, "Dublin", "Honda CRV", 3), + (300, "San Jose", "Honda Civic", 5), + (300, "San Jose", "Honda Accord", 8) + ).toDF("id", "city", "car_model", "quantity").createOrReplaceTempView("dealer") + + val expectedWithRank = Seq( + Row("San Jose", "Honda Accord", 1), + Row("Dublin", "Honda CRV", 1), + Row("San Jose", "Honda Civic", 1)) + checkAnswer( + sql( + """ + |SELECT city, car_model, RANK() OVER (PARTITION BY car_model ORDER BY quantity) AS rank + |FROM dealer + |QUALIFY rank = 1 + """.stripMargin), + expectedWithRank) + + checkAnswer( + sql( + """ + |SELECT city, car_model + |FROM dealer + |QUALIFY RANK() OVER (PARTITION BY car_model ORDER BY quantity) = 1 + """.stripMargin), + expectedWithRank.map(row => Row(row.getString(0), row.getString(1)))) + } + } + + test("QUALIFY filters window function results after HAVING") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkAnswer( + sql( + """ + |SELECT a, SUM(b) AS total + |FROM testData2 + |GROUP BY a + |HAVING SUM(b) > 2 + |QUALIFY ROW_NUMBER() OVER (ORDER BY a DESC) = 1 + """.stripMargin), + Row(3, 3)) + } + } + + test("QUALIFY filters window function results after legacy HAVING without GROUP BY") { + withSQLConf(SQLConf.LEGACY_HAVING_WITHOUT_GROUP_BY_AS_WHERE.key -> "true") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkAnswer( + sql( + """ + |SELECT a + |FROM testData2 + |HAVING a > 1 + |QUALIFY ROW_NUMBER() OVER (ORDER BY b, a) = 1 + """.stripMargin), + Row(2)) + } + } + } + + test("QUALIFY requires a current-query window function") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkError( + exception = intercept[AnalysisException] { + sql("SELECT a FROM testData2 QUALIFY a = 1").queryExecution.analyzed + }, + condition = "QUALIFY_REQUIRES_WINDOW_FUNCTION", + parameters = Map.empty) + + checkError( + exception = intercept[AnalysisException] { + sql( + """ + |SELECT a + |FROM (SELECT a, RANK() OVER (ORDER BY b) AS rank FROM testData2) t + |QUALIFY a = 1 + """.stripMargin).queryExecution.analyzed + }, + condition = "QUALIFY_REQUIRES_WINDOW_FUNCTION", + parameters = Map.empty) + } + } + + test("QUALIFY does not allow aggregate functions in its predicate") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkError( + exception = intercept[AnalysisException] { + sql("SELECT a, RANK() OVER (ORDER BY b) AS rank FROM testData2 QUALIFY COUNT(1) > 1") + .queryExecution.analyzed + }, + condition = "QUALIFY_AGGREGATE_NOT_ALLOWED", + parameters = Map("aggregateExpr" -> "\"count(1)\""), + context = ExpectedContext("COUNT(1)", 66, 73)) + + Seq( + """ + |SELECT a, SUM(b) AS total, RANK() OVER (ORDER BY SUM(b)) AS rank + |FROM testData2 + |GROUP BY a + |QUALIFY SUM(b) > 1 + """.stripMargin + ).foreach { query => + val e = withClue(query) { + intercept[AnalysisException] { + sql(query).queryExecution.analyzed + } + } + assert(e.getCondition == "QUALIFY_AGGREGATE_NOT_ALLOWED") + } + } + } + + test("QUALIFY allows aggregate aliases in its predicate") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkAnswer( + sql( + """ + |SELECT a, SUM(b) AS total, ROW_NUMBER() OVER (ORDER BY a) AS rn + |FROM testData2 + |GROUP BY a + |QUALIFY total > 1 + """.stripMargin), + Seq(Row(1, 3, 1), Row(2, 3, 2), Row(3, 3, 3))) + + checkAnswer( + sql( + """ + |SELECT a, SUM(b) AS total + |FROM testData2 + |GROUP BY a + |QUALIFY ROW_NUMBER() OVER (ORDER BY a) = 1 AND total > 1 + """.stripMargin), + Row(1, 3)) + } + } + + test("QUALIFY allows references to columns from an inner aggregated subquery") { + withTempView("testData2") { + testData2.createOrReplaceTempView("testData2") + + checkAnswer( + sql( + """ + |SELECT a, total + |FROM ( + | SELECT a, SUM(b) AS total + | FROM testData2 + | GROUP BY a + |) t + |QUALIFY ROW_NUMBER() OVER (ORDER BY a) = 1 AND total > 1 + """.stripMargin), + Row(1, 3)) + } + } + + test("QUALIFY is non-reserved in non-ANSI mode") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + checkAnswer(sql("SELECT qualify FROM VALUES (1) AS t(qualify)"), Row(1)) + } + } + test("window functions in multiple selects") { val df = Seq( ("S1", "P1", 100), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 94e60db67ac75..465f127634118 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -22,8 +22,8 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkThrowable} import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedHaving, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Concat, GreaterThan, Literal, NullsFirst, SortOrder, UnresolvedWindowExpression, UnspecifiedFrame, WindowSpecDefinition, WindowSpecReference} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, QualifyExpression, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedHaving, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Concat, EqualTo, GreaterThan, Literal, NullsFirst, SortOrder, UnresolvedWindowExpression, UnspecifiedFrame, WindowExpression, WindowSpecDefinition, WindowSpecReference} import org.apache.spark.sql.catalyst.parser.{AbstractParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern._ @@ -787,6 +787,52 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { stop = 264)) } + test("QUALIFY clause") { + assertEqual( + """ + |SELECT a, RANK() OVER (ORDER BY b) AS rank + |FROM testData2 + |QUALIFY rank = 1 + """.stripMargin, + Filter( + QualifyExpression( + EqualTo(UnresolvedAttribute("rank"), Literal(1)), + selectListHasWindowFunction = true), + Project( + Seq( + $"a", + Alias( + WindowExpression( + UnresolvedFunction("RANK", Seq.empty, isDistinct = false), + WindowSpecDefinition( + Nil, + Seq(SortOrder($"b", Ascending, NullsFirst, Seq.empty)), + UnspecifiedFrame)), + "rank")()), + UnresolvedRelation(TableIdentifier("testData2"))))) + + assertEqual( + """ + |SELECT a + |FROM testData2 + |QUALIFY RANK() OVER (ORDER BY b) = 1 + """.stripMargin, + Filter( + QualifyExpression( + EqualTo( + WindowExpression( + UnresolvedFunction("RANK", Seq.empty, isDistinct = false), + WindowSpecDefinition( + Nil, + Seq(SortOrder($"b", Ascending, NullsFirst, Seq.empty)), + UnspecifiedFrame)), + Literal(1)), + selectListHasWindowFunction = false), + Project( + Seq($"a"), + UnresolvedRelation(TableIdentifier("testData2"))))) + } + test("CLEAR CACHE") { assertEqual("CLEAR CACHE", ClearCacheCommand) }