diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index 3e5dc6a0b187..65d092fc96ae 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -21,6 +21,78 @@ use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; use insta::assert_snapshot; +#[derive(Clone, Copy)] +enum ArrayAggOrder { + OrderedByValIdx, + Unordered, +} + +impl ArrayAggOrder { + fn array_agg_expr(self) -> &'static str { + match self { + Self::OrderedByValIdx => "array_agg(val ORDER BY val_idx) AS vals", + Self::Unordered => "array_agg(val) AS vals", + } + } +} + +fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String { + format!( + r#" + WITH unnested AS ( + SELECT + row_idx, + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) + ), + ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested + ) + SELECT + row_idx, + {array_agg} + FROM ranked + GROUP BY row_idx + ORDER BY row_idx + "#, + array_agg = order.array_agg_expr(), + ) +} + +fn regression_ctx() -> SessionContext { + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)) +} + +fn explain_analyze_sql(sql: &str) -> String { + format!("EXPLAIN ANALYZE {sql}") +} + +async fn formatted_physical_plan(ctx: &SessionContext, sql: &str) -> Result { + let physical_plan = ctx.sql(sql).await?.create_physical_plan().await?; + Ok(displayable(physical_plan.as_ref()).indent(true).to_string()) +} + +fn assert_plan_contains(formatted: &str, required: &[&str], forbidden: &[&str]) { + for needle in required { + assert_contains!(formatted, *needle); + } + for needle in forbidden { + assert_not_contains!(formatted, *needle); + } +} + #[tokio::test] async fn csv_query_array_agg_distinct() -> Result<()> { let ctx = SessionContext::new(); @@ -69,6 +141,87 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Ok(()) } +#[tokio::test] +async fn ordered_array_agg_after_unnest_regression() -> Result<()> { + let ctx = regression_ctx(); + let sql = array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx); + let unordered_sql = array_agg_after_unnest_sql(ArrayAggOrder::Unordered); + + let results = execute_to_batches(&ctx, &sql).await; + assert_snapshot!(batches_to_sort_string(&results), @r" + +---------+-----------+ + | row_idx | vals | + +---------+-----------+ + | 1 | [3, 1, 2] | + | 2 | [5, 4] | + +---------+-----------+ + "); + + let formatted = formatted_physical_plan(&ctx, &sql).await?; + let unordered_formatted = formatted_physical_plan(&ctx, &unordered_sql).await?; + + assert_plan_contains( + &formatted, + &[ + "UnnestExec", + "BoundedWindowAggExec", + "AggregateExec: mode=SinglePartitioned", + "aggr=[array_agg(ranked.val) ORDER BY [", + "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", + ], + &[ + "AggregateExec: mode=Partial", + "AggregateExec: mode=FinalPartitioned", + ], + ); + + // The unordered branch can use the compact GroupsAccumulator path, which also + // means `val_idx` is dead and the optimizer prunes the window entirely. + assert_plan_contains( + &unordered_formatted, + &[ + "UnnestExec", + "AggregateExec: mode=Partial", + "AggregateExec: mode=FinalPartitioned", + "aggr=[array_agg(ranked.val)]", + ], + &["ORDER BY [ranked.val_idx"], + ); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> { + let ctx = regression_ctx(); + let sql = + explain_analyze_sql(&array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx)); + let actual = execute_to_batches(&ctx, &sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual)?.to_string(); + + assert_metrics!( + &formatted, + "UnnestExec", + "metrics=[output_rows=5", + "output_batches=1" + ); + assert_metrics!(&formatted, "BoundedWindowAggExec", "metrics=[output_rows=5"); + assert_metrics!( + &formatted, + "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", + "metrics=[output_rows=5" + ); + assert_metrics!( + &formatted, + "AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx]", + "metrics=[output_rows=2", + "peak_mem_used=" + ); + + Ok(()) +} + #[tokio::test] async fn count_partitioned() -> Result<()> { let results = diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 8cfc01380d4b..f4f67f380384 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -996,6 +996,100 @@ physical_plan 09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] +# The unordered case above is not sufficient coverage for issue 20788: +# adding ORDER BY to array_agg exercises the order-sensitive accumulator path +# rather than the compact GroupsAccumulator fast path used by unordered array_agg. +# This variant also uses an explicit row id plus a generated element ordinal, +# then computes row_number over that ordinal so the explain plan still pins the +# windowed execution shape from the issue while validating original list order. +query TT +EXPLAIN WITH unnested AS ( + SELECT + row_idx, + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) +), +ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested +) +SELECT + row_idx, + array_agg(val ORDER BY val_idx) AS vals +FROM ranked +GROUP BY row_idx +ORDER BY row_idx; +---- +logical_plan +01)Sort: ranked.row_idx ASC NULLS LAST +02)--Projection: ranked.row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST] AS vals +03)----Aggregate: groupBy=[[ranked.row_idx]], aggr=[[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]]] +04)------SubqueryAlias: ranked +05)--------Projection: unnested.row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS val_idx, unnested.val +06)----------WindowAggr: windowExpr=[[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +07)------------SubqueryAlias: unnested +08)--------------Projection: t.row_idx, __unnest_placeholder(t.vals,depth=1) AS UNNEST(t.vals) AS val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1) AS UNNEST(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) AS original_idx +09)----------------Unnest: lists[__unnest_placeholder(t.vals)|depth=1, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))|depth=1] structs[] +10)------------------Projection: t.row_idx, t.vals AS __unnest_placeholder(t.vals), range(Int64(1), CAST(CAST(cardinality(t.vals) AS Decimal128(20, 0)) + Decimal128(Some(1),20,0) AS Int64)) AS __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) +11)--------------------SubqueryAlias: t +12)----------------------Projection: column1 AS row_idx, column2 AS vals +13)------------------------Values: (Int64(1), List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (Int64(2), List([5, 4]) AS make_array(Int64(5),Int64(4))) +physical_plan +01)SortPreservingMergeExec: [row_idx@0 ASC NULLS LAST] +02)--ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]@1 as vals] +03)----AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]], ordering_mode=Sorted +04)------ProjectionExec: expr=[row_idx@0 as row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as val_idx, val@1 as val] +05)--------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +06)----------SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST], preserve_partitioning=[true] +07)------------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=1 +08)--------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(t.vals,depth=1)@1 as val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1)@2 as original_idx] +09)----------------UnnestExec +10)------------------ProjectionExec: expr=[column1@0 as row_idx, column2@1 as __unnest_placeholder(t.vals), range(1, CAST(CAST(cardinality(column2@1) AS Decimal128(20, 0)) + Some(1),20,0 AS Int64)) as __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))] +11)--------------------DataSourceExec: partitions=1, partition_sizes=[1] + +query I? +WITH unnested AS ( + SELECT + row_idx, + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) +), +ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested +) +SELECT + row_idx, + array_agg(val ORDER BY val_idx) AS vals +FROM ranked +GROUP BY row_idx +ORDER BY row_idx; +---- +1 [3, 1, 2] +2 [5, 4] + # Unnest array where data is already ordered by column2 (100, 200, 300, 400) statement ok COPY (