From 75cd4d79021e0a0bec6e245e1912b64202e8c209 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Sun, 22 Mar 2026 00:02:39 -0400 Subject: [PATCH 1/5] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. --- datafusion/sql/src/unparser/plan.rs | 37 ++++++++++++++++ datafusion/sql/tests/cases/plan_to_sql.rs | 54 ++++++++++++++++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ca8dfa431b4f..73950b961e8b 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -828,6 +828,27 @@ impl Unparser<'_> { Some(plan_alias.alias.clone()), select.already_projected(), )?; + + // If the SubqueryAlias directly wraps a plan that builds its + // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, + // we must emit a derived subquery: (SELECT ...) AS alias. + // Without this, the recursive handler would merge those clauses + // into the outer SELECT, losing the subquery structure entirely. + if unparsed_table_scan.is_none() + && Self::requires_derived_subquery(plan_alias.input.as_ref()) + { + return self.derive( + &plan_alias.input, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + columns, + )), + false, + ); + } + // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it if !select.already_projected() && unparsed_table_scan.is_none() { @@ -1060,6 +1081,22 @@ impl Unparser<'_> { scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some() } + /// Returns true if a plan, when used as the direct child of a SubqueryAlias, + /// must be emitted as a derived subquery `(SELECT ...) AS alias`. + /// + /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, + /// window functions). + fn requires_derived_subquery(plan: &LogicalPlan) -> bool { + matches!( + plan, + LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Union(_) + ) + } + /// Try to unparse a table scan with pushdown operations into a new subquery plan. /// If the table scan is without any pushdown operations, return None. fn unparse_table_scan_pushdown( diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index aefb404ba410..24f922663645 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -23,7 +23,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{WindowFunction, WindowFunctionParams}; use datafusion_expr::test::function_stub::{ - count_udaf, max_udaf, min_udaf, sum, sum_udaf, + count_udaf, max, max_udaf, min_udaf, sum, sum_udaf, }; use datafusion_expr::{ EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, @@ -2893,3 +2893,55 @@ fn test_json_access_3() { @r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"# ); } + +/// Test that unparsing a manually constructed join with a subquery aggregate +/// preserves the MAX aggregate function. +/// +/// Builds the equivalent of: +/// SELECT j1.j1_string FROM j1 +/// JOIN (SELECT max(j2_id) AS max_id FROM j2) AS b +/// ON j1.j1_id = b.max_id +#[test] +fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { + let context = MockContextProvider { + state: MockSessionState::default(), + }; + let j1_schema = context + .get_table_source(TableReference::bare("j1"))? + .schema(); + let j2_schema = context + .get_table_source(TableReference::bare("j2"))? + .schema(); + + // Build the right side: SELECT max(j2_id) AS max_id FROM j2 + let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; + let right_agg = LogicalPlanBuilder::from(right_scan) + .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .build()?; + let right_subquery = subquery_alias(right_agg, "b")?; + + // Build the full plan: SELECT j1.j1_string FROM j1 JOIN (...) AS b ON j1.j1_id = b.max_id + let left_scan = table_scan(Some("j1"), &j1_schema, None)?.build()?; + let plan = LogicalPlanBuilder::from(left_scan) + .join( + right_subquery, + datafusion_expr::JoinType::Inner, + ( + vec![Column::from_qualified_name("j1.j1_id")], + vec![Column::from_qualified_name("b.max_id")], + ), + None, + )? + .project(vec![col("j1.j1_string")])? + .build()?; + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?.to_string(); + let sql_upper = sql.to_uppercase(); + assert!( + sql_upper.contains("MAX("), + "Unparsed SQL should preserve the MAX aggregate function call, got: {sql}" + ); + + Ok(()) +} From 0d223f9fea321ca4667aca57c5bca483cc201aa9 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 27 Mar 2026 12:41:57 -0400 Subject: [PATCH 2/5] fix: preserve subquery structure when unparsing SubqueryAlias over Aggregate When the SQL unparser encountered a SubqueryAlias node whose direct child was an Aggregate (or other clause-building plan like Window, Sort, Limit, Union), it would flatten the subquery into a simple table alias, losing the aggregate entirely. For example, a plan representing: SELECT j1.col FROM j1 JOIN (SELECT max(id) AS m FROM j2) AS b ON j1.id = b.m would unparse to: SELECT j1.col FROM j1 INNER JOIN j2 AS b ON j1.id = b.m dropping the MAX aggregate and the subquery. Root cause: the SubqueryAlias handler in select_to_sql_recursively would call subquery_alias_inner_query_and_columns (which only unwraps Projection children) and unparse_table_scan_pushdown (which only handles TableScan/SubqueryAlias/Projection). When both returned nothing useful for an Aggregate child, the code recursed directly into the Aggregate, merging its GROUP BY into the outer SELECT instead of wrapping it in a derived subquery. The fix adds an early check: if the SubqueryAlias's direct child is a plan type that builds its own SELECT clauses (Aggregate, Window, Sort, Limit, Union), emit it as a derived subquery via self.derive() with the alias always attached, rather than falling through to the recursive path that would flatten it. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/plan.rs | 2 +- datafusion/sql/tests/cases/plan_to_sql.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 73950b961e8b..0b9f24bc8732 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1085,7 +1085,7 @@ impl Unparser<'_> { /// must be emitted as a derived subquery `(SELECT ...) AS alias`. /// /// Plans like Aggregate or Window build their own SELECT clauses (GROUP BY, - /// window functions). + /// window functions). fn requires_derived_subquery(plan: &LogicalPlan) -> bool { matches!( plan, diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 24f922663645..db94e32c8d91 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2916,7 +2916,10 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { // Build the right side: SELECT max(j2_id) AS max_id FROM j2 let right_scan = table_scan(Some("j2"), &j2_schema, None)?.build()?; let right_agg = LogicalPlanBuilder::from(right_scan) - .aggregate(vec![] as Vec, vec![max(col("j2.j2_id")).alias("max_id")])? + .aggregate( + vec![] as Vec, + vec![max(col("j2.j2_id")).alias("max_id")], + )? .build()?; let right_subquery = subquery_alias(right_agg, "b")?; From 42f7f64e6a9cd400dc03e10a498bc32ffd67be57 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Fri, 3 Apr 2026 22:21:44 -0400 Subject: [PATCH 3/5] Fixes in PR --- datafusion/sql/src/unparser/plan.rs | 33 +++++++++++++++++++---- datafusion/sql/tests/cases/plan_to_sql.rs | 14 ++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 0b9f24bc8732..c0b77b9dba88 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -829,17 +829,40 @@ impl Unparser<'_> { select.already_projected(), )?; - // If the SubqueryAlias directly wraps a plan that builds its - // own SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds + // If the (possibly rewritten) inner plan builds its own + // SELECT clauses (e.g. Aggregate adds GROUP BY, Window adds // OVER, etc.) and unparse_table_scan_pushdown couldn't reduce it, // we must emit a derived subquery: (SELECT ...) AS alias. // Without this, the recursive handler would merge those clauses // into the outer SELECT, losing the subquery structure entirely. - if unparsed_table_scan.is_none() - && Self::requires_derived_subquery(plan_alias.input.as_ref()) + if unparsed_table_scan.is_none() && Self::requires_derived_subquery(plan) { + // When the dialect does not support column aliases in + // table aliases (e.g. SQLite), inject the aliases into + // the inner projection before wrapping as a derived + // subquery. + if !columns.is_empty() + && !self.dialect.supports_column_alias_in_table_alias() + { + let Ok(rewritten_plan) = + inject_column_aliases_into_subquery(plan.clone(), columns) + else { + return internal_err!( + "Failed to transform SubqueryAlias plan" + ); + }; + return self.derive( + &rewritten_plan, + relation, + Some(self.new_table_alias( + plan_alias.alias.table().to_string(), + vec![], + )), + false, + ); + } return self.derive( - &plan_alias.input, + plan, relation, Some(self.new_table_alias( plan_alias.alias.table().to_string(), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index db94e32c8d91..9aff55582577 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2894,6 +2894,20 @@ fn test_json_access_3() { ); } +/// Roundtrip test for a subquery aggregate with column aliases. +/// Ensures that `subquery_alias_inner_query_and_columns` unwrapping +/// a Projection -> Aggregate still triggers the derived-subquery path. +#[test] +fn roundtrip_subquery_aggregate_with_column_alias() -> Result<(), DataFusionError> { + roundtrip_statement_with_dialect_helper!( + sql: "SELECT id FROM (SELECT max(j1_id) FROM j1) AS c(id)", + parser_dialect: GenericDialect {}, + unparser_dialect: UnparserDefaultDialect {}, + expected: @"SELECT c.id FROM (SELECT max(j1.j1_id) FROM j1) AS c (id)", + ); + Ok(()) +} + /// Test that unparsing a manually constructed join with a subquery aggregate /// preserves the MAX aggregate function. /// From ed46d9d5d8023214b8e7f50c7fc49bd13f60ca54 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Wed, 8 Apr 2026 20:48:22 -0400 Subject: [PATCH 4/5] fix: Unparser drops column alias needed by ORDER BY when flattening Projection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In `rewrite_plan_for_sort_on_non_projected_fields`, when the outer Projection excludes a Sort column that was defined as an alias in the inner Projection (e.g. `Z AS c`), the rewrite replaced the inner Projection's expressions with only the outer Projection's mapped expressions, dropping the alias definition. This left ORDER BY referencing a non-existent column. The fix inlines the underlying physical expression into the Sort when an alias is dropped (e.g. `ORDER BY c` → `ORDER BY t."Z"`). Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/rewrite.rs | 40 +++++++++++++++++ datafusion/sql/tests/cases/plan_to_sql.rs | 54 +++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 723cdcd687f7..3364874a7334 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -254,6 +254,46 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( .map(|e| map.get(e).unwrap_or(e).clone()) .collect::>(); + // Build a reverse map from alias name → underlying expression for + // sort columns that are dropped from the outer projection. When + // the inner Projection is trimmed to `new_exprs` below, any alias + // that only existed in the inner Projection disappears, so ORDER BY + // references to it become dangling. We inline the physical + // expression instead (e.g. ORDER BY "c" → ORDER BY "Z"). + let projected_aliases: HashSet<&str> = new_exprs + .iter() + .filter_map(|e| match e { + Expr::Alias(alias) => Some(alias.name.as_str()), + _ => None, + }) + .collect(); + + let alias_to_underlying: HashMap = inner_p + .expr + .iter() + .filter_map(|e| match e { + Expr::Alias(alias) if !projected_aliases.contains(alias.name.as_str()) => { + Some((alias.name.clone(), (*alias.expr).clone())) + } + _ => None, + }) + .collect(); + + if !alias_to_underlying.is_empty() { + for sort_expr in &mut sort.expr { + let mut expr = sort_expr.expr.clone(); + while let Expr::Alias(alias) = expr { + expr = *alias.expr; + } + if let Expr::Column(ref col) = expr { + let name = col.name(); + if let Some(underlying) = alias_to_underlying.get(name) { + sort_expr.expr = underlying.clone(); + } + } + } + } + inner_p.expr.clone_from(&new_exprs); sort.input = Arc::new(LogicalPlan::Projection(inner_p)); diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 1d80b5b4816c..33b028b1ca52 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2973,3 +2973,57 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { Ok(()) } + +/// Regression test: when the outer Projection excludes a Sort column that was +/// defined as an alias in the inner Projection (through a SubqueryAlias), the +/// Unparser must either preserve the subquery boundary or inline the physical +/// column name into the ORDER BY clause. +/// +/// See: https://github.com/apache/datafusion/issues/XXXX +#[test] +fn test_sort_on_aliased_column_dropped_by_outer_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("X", DataType::Utf8, true), + Field::new("Y", DataType::Utf8, true), + Field::new("Z", DataType::Utf8, true), + ]); + + // Build: + // Projection: [a, b] -- outer: excludes sort column "c" + // Sort: [c DESC, fetch=1] -- references alias "c" + // Projection: [X AS a, Y AS b, Z AS c] -- defines alias "c" + // SubqueryAlias: t + // TableScan: phys_table [X, Y, Z] + let plan = table_scan(Some("phys_table"), &schema, None)? + .alias("t")? + .project(vec![ + Expr::Column(Column::new(Some(TableReference::bare("t")), "X")) + .alias("a"), + Expr::Column(Column::new(Some(TableReference::bare("t")), "Y")) + .alias("b"), + Expr::Column(Column::new(Some(TableReference::bare("t")), "Z")) + .alias("c"), + ])? + .sort_with_limit( + vec![Expr::Column(Column::new_unqualified("c")).sort(false, true)], + Some(1), + )? + .project(vec![ + Expr::Column(Column::new_unqualified("a")), + Expr::Column(Column::new_unqualified("b")), + ])? + .build()?; + + let unparser = Unparser::default(); + let sql = unparser.plan_to_sql(&plan)?; + + // The sort column "c" (aliased from "Z" in the inner Projection) must be + // inlined to the physical column in ORDER BY, since the outer Projection + // excludes it from the SELECT list. + assert_snapshot!( + sql, + @r#"SELECT t."X" AS a, t."Y" AS b FROM phys_table AS t ORDER BY t."Z" DESC NULLS FIRST LIMIT 1"# + ); + + Ok(()) +} From ebd12eb4e7ece38bbdf254be6b791e3fb40f90c0 Mon Sep 17 00:00:00 2001 From: Yonatan Striem-Amit Date: Wed, 8 Apr 2026 20:57:53 -0400 Subject: [PATCH 5/5] chore: tidy up comments in sort alias fix Clarify what the algorithm does and why, remove example-specific column names from the implementation comments. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/sql/src/unparser/rewrite.rs | 20 ++++++++++---------- datafusion/sql/tests/cases/plan_to_sql.rs | 14 ++++++-------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 3364874a7334..5d92d366cdf4 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -254,12 +254,13 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( .map(|e| map.get(e).unwrap_or(e).clone()) .collect::>(); - // Build a reverse map from alias name → underlying expression for - // sort columns that are dropped from the outer projection. When - // the inner Projection is trimmed to `new_exprs` below, any alias - // that only existed in the inner Projection disappears, so ORDER BY - // references to it become dangling. We inline the physical - // expression instead (e.g. ORDER BY "c" → ORDER BY "Z"). + // The inner Projection may define aliases that the Sort references + // but the outer Projection does not include. Since we are about to + // replace the inner Projection's expressions with `new_exprs` (which + // only contains the outer Projection's columns), those alias + // definitions will be lost. To keep the Sort valid, rewrite any + // sort expression that references a dropped alias so that it uses + // the alias's underlying expression instead. let projected_aliases: HashSet<&str> = new_exprs .iter() .filter_map(|e| match e { @@ -268,7 +269,7 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( }) .collect(); - let alias_to_underlying: HashMap = inner_p + let dropped_aliases: HashMap = inner_p .expr .iter() .filter_map(|e| match e { @@ -279,15 +280,14 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields( }) .collect(); - if !alias_to_underlying.is_empty() { + if !dropped_aliases.is_empty() { for sort_expr in &mut sort.expr { let mut expr = sort_expr.expr.clone(); while let Expr::Alias(alias) = expr { expr = *alias.expr; } if let Expr::Column(ref col) = expr { - let name = col.name(); - if let Some(underlying) = alias_to_underlying.get(name) { + if let Some(underlying) = dropped_aliases.get(col.name()) { sort_expr.expr = underlying.clone(); } } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 33b028b1ca52..acaaf9db4b42 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2974,12 +2974,12 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> { Ok(()) } -/// Regression test: when the outer Projection excludes a Sort column that was -/// defined as an alias in the inner Projection (through a SubqueryAlias), the -/// Unparser must either preserve the subquery boundary or inline the physical -/// column name into the ORDER BY clause. +/// Regression test for https://github.com/apache/datafusion/issues/21490 /// -/// See: https://github.com/apache/datafusion/issues/XXXX +/// When the outer Projection excludes a Sort column whose definition only +/// exists as an alias in the inner Projection, the Unparser must inline the +/// underlying expression into ORDER BY rather than emitting the now-missing +/// alias name. #[test] fn test_sort_on_aliased_column_dropped_by_outer_projection() -> Result<()> { let schema = Schema::new(vec![ @@ -3017,9 +3017,7 @@ fn test_sort_on_aliased_column_dropped_by_outer_projection() -> Result<()> { let unparser = Unparser::default(); let sql = unparser.plan_to_sql(&plan)?; - // The sort column "c" (aliased from "Z" in the inner Projection) must be - // inlined to the physical column in ORDER BY, since the outer Projection - // excludes it from the SELECT list. + // ORDER BY must reference the physical column, not the dropped alias. assert_snapshot!( sql, @r#"SELECT t."X" AS a, t."Y" AS b FROM phys_table AS t ORDER BY t."Z" DESC NULLS FIRST LIMIT 1"#