From c14d60791a76b069b13fb49b2fb8d575b45207b3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 18:07:31 +0800 Subject: [PATCH 1/8] Add test for ordered_array_agg after unnest Verify query result for unnest -> row_number -> group by -> array_agg with ordered output. Ensure physical plan includes UnnestExec, BoundedWindowAggExec, and ordered array_agg path. --- datafusion/core/tests/sql/aggregates/basic.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index 3e5dc6a0b187..d9e9bb04aac8 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -69,6 +69,64 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Ok(()) } +#[tokio::test] +async fn ordered_array_agg_after_unnest_regression() -> Result<()> { + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(4), + ); + let sql = r#" + WITH indexed AS ( + SELECT + ROW_NUMBER() OVER () AS row_idx, + vals + FROM (VALUES ([3, 1, 2]), ([5, 4])) AS t(vals) + ), + unnested AS ( + SELECT + row_idx, + unnest(vals) AS val + FROM indexed + ), + ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER (PARTITION BY row_idx ORDER BY val) AS val_idx, + val + FROM unnested + ) + SELECT + row_idx, + array_agg(val ORDER BY val_idx) AS vals + FROM ranked + GROUP BY row_idx + ORDER BY row_idx + "#; + + let results = execute_to_batches(&ctx, sql).await; + assert_snapshot!(batches_to_sort_string(&results), @r" + +---------+-----------+ + | row_idx | vals | + +---------+-----------+ + | 1 | [1, 2, 3] | + | 2 | [4, 5] | + +---------+-----------+ + "); + + let dataframe = ctx.sql(sql).await?; + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); + + assert_contains!(&formatted, "UnnestExec"); + assert_contains!(&formatted, "BoundedWindowAggExec"); + assert_contains!(&formatted, "array_agg("); + assert_contains!( + &formatted, + "ORDER BY [ranked.val_idx ASC NULLS LAST]" + ); + + Ok(()) +} + #[tokio::test] async fn count_partitioned() -> Result<()> { let results = From 1273d5ef3a5c602cb0b8c71696f0f14e7f69fce2 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 18:38:07 +0800 Subject: [PATCH 2/8] Add ordered EXPLAIN case to unnest.slt Introduce an ordered EXPLAIN regression case next to the existing unordered coverage in unnest.slt. Add a comment clarifying why unordered array_agg does not suffice for issue 20788. Update the Rust plan assertion in basic.rs to be less brittle by verifying the aggregate shape instead of relying on the exact alias-qualified ORDER BY expression text. --- datafusion/core/tests/sql/aggregates/basic.rs | 6 +- datafusion/sqllogictest/test_files/unnest.slt | 62 +++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index d9e9bb04aac8..eef25b108043 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -118,11 +118,7 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { assert_contains!(&formatted, "UnnestExec"); assert_contains!(&formatted, "BoundedWindowAggExec"); - assert_contains!(&formatted, "array_agg("); - assert_contains!( - &formatted, - "ORDER BY [ranked.val_idx ASC NULLS LAST]" - ); + assert_contains!(&formatted, "aggr=[array_agg(ranked.val) ORDER BY ["); Ok(()) } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 8cfc01380d4b..526c03f82e07 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -996,6 +996,68 @@ physical_plan 09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] +# The unordered case above is not sufficient coverage for issue 20788: +# adding ORDER BY to array_agg exercises the order-sensitive accumulator path +# rather than the compact GroupsAccumulator fast path used by unordered array_agg. +query TT +EXPLAIN WITH indexed AS ( + SELECT + ROW_NUMBER() OVER () AS row_idx, + vals + FROM (VALUES ([3, 1, 2]), ([5, 4])) AS t(vals) +), +unnested AS ( + SELECT + row_idx, + unnest(vals) AS val + FROM indexed +), +ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER (PARTITION BY row_idx ORDER BY val) AS val_idx, + val + FROM unnested +) +SELECT + row_idx, + array_agg(val ORDER BY val_idx) AS vals +FROM ranked +GROUP BY row_idx +ORDER BY row_idx; +---- +logical_plan +01)Sort: ranked.row_idx ASC NULLS LAST +02)--Projection: ranked.row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST] AS vals +03)----Aggregate: groupBy=[[ranked.row_idx]], aggr=[[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]]] +04)------SubqueryAlias: ranked +05)--------Projection: unnested.row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS val_idx, unnested.val +06)----------WindowAggr: windowExpr=[[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +07)------------SubqueryAlias: unnested +08)--------------Projection: indexed.row_idx, __unnest_placeholder(indexed.vals,depth=1) AS UNNEST(indexed.vals) AS val +09)----------------Unnest: lists[__unnest_placeholder(indexed.vals)|depth=1] structs[] +10)------------------Projection: indexed.row_idx, indexed.vals AS __unnest_placeholder(indexed.vals) +11)--------------------SubqueryAlias: indexed +12)----------------------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS row_idx, t.vals +13)------------------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +14)--------------------------SubqueryAlias: t +15)----------------------------Projection: column1 AS vals +16)------------------------------Values: (List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (List([5, 4]) AS make_array(Int64(5),Int64(4))) +physical_plan +01)SortPreservingMergeExec: [row_idx@0 ASC NULLS LAST] +02)--ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]@1 as vals] +03)----AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]], ordering_mode=Sorted +04)------ProjectionExec: expr=[row_idx@0 as row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as val_idx, val@1 as val] +05)--------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +06)----------SortExec: expr=[row_idx@0 ASC NULLS LAST, val@1 ASC NULLS LAST], preserve_partitioning=[true] +07)------------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=1, maintains_sort_order=true +08)--------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(indexed.vals,depth=1)@1 as val] +09)----------------UnnestExec +10)------------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as row_idx, vals@0 as __unnest_placeholder(indexed.vals)] +11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] +12)----------------------ProjectionExec: expr=[column1@0 as vals] +13)------------------------DataSourceExec: partitions=1, partition_sizes=[1] + # Unnest array where data is already ordered by column2 (100, 200, 300, 400) statement ok COPY ( From 360c2b6e19378deac5e2abb0aaf921ac08738341 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 18:44:11 +0800 Subject: [PATCH 3/8] Refactor dataframe binding and deduplicate assertions Inline single-use dataframe binding and reduce repetition in assert_contains! calls using a loop. Maintain all public interfaces unchanged. --- datafusion/core/tests/sql/aggregates/basic.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index eef25b108043..a53cece7061b 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -112,13 +112,16 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { +---------+-----------+ "); - let dataframe = ctx.sql(sql).await?; - let physical_plan = dataframe.create_physical_plan().await?; + let physical_plan = ctx.sql(sql).await?.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - assert_contains!(&formatted, "UnnestExec"); - assert_contains!(&formatted, "BoundedWindowAggExec"); - assert_contains!(&formatted, "aggr=[array_agg(ranked.val) ORDER BY ["); + for needle in [ + "UnnestExec", + "BoundedWindowAggExec", + "aggr=[array_agg(ranked.val) ORDER BY [", + ] { + assert_contains!(&formatted, needle); + } Ok(()) } From 82b725b6efa68c594c2540461a1b03b34a5eb71b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 19:37:39 +0800 Subject: [PATCH 4/8] Refactor ordered regression to use original element position Rebuild arrays based on original element position rather than value order. Update basic.rs and unnest.slt by replacing synthetic ROW_NUMBER() with a paired unnest for ordinal generation. Switch input rows to explicit row_idx literals to eliminate dependency on ROW_NUMBER(). Add a correctness query to the SLT case in addition to explain coverage. --- datafusion/core/tests/sql/aggregates/basic.rs | 34 +++--- datafusion/sqllogictest/test_files/unnest.slt | 100 ++++++++++-------- 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index a53cece7061b..3e7b04351fe0 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -75,29 +75,21 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { SessionConfig::new().with_target_partitions(4), ); let sql = r#" - WITH indexed AS ( - SELECT - ROW_NUMBER() OVER () AS row_idx, - vals - FROM (VALUES ([3, 1, 2]), ([5, 4])) AS t(vals) - ), - unnested AS ( - SELECT - row_idx, - unnest(vals) AS val - FROM indexed - ), - ranked AS ( + WITH unnested AS ( SELECT row_idx, - ROW_NUMBER() OVER (PARTITION BY row_idx ORDER BY val) AS val_idx, - val - FROM unnested + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) ) SELECT row_idx, array_agg(val ORDER BY val_idx) AS vals - FROM ranked + FROM unnested GROUP BY row_idx ORDER BY row_idx "#; @@ -107,8 +99,8 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { +---------+-----------+ | row_idx | vals | +---------+-----------+ - | 1 | [1, 2, 3] | - | 2 | [4, 5] | + | 1 | [3, 1, 2] | + | 2 | [5, 4] | +---------+-----------+ "); @@ -117,8 +109,8 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { for needle in [ "UnnestExec", - "BoundedWindowAggExec", - "aggr=[array_agg(ranked.val) ORDER BY [", + "aggr=[array_agg(unnested.val) ORDER BY [", + "range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8(\"Int64\")))", ] { assert_contains!(&formatted, needle); } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 526c03f82e07..528b0b624a48 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -999,64 +999,74 @@ physical_plan # The unordered case above is not sufficient coverage for issue 20788: # adding ORDER BY to array_agg exercises the order-sensitive accumulator path # rather than the compact GroupsAccumulator fast path used by unordered array_agg. +# This variant also uses an explicit row id plus a generated element ordinal, +# so it validates rebuilding the original list order after unnest instead of +# merely sorting by value. query TT -EXPLAIN WITH indexed AS ( - SELECT - ROW_NUMBER() OVER () AS row_idx, - vals - FROM (VALUES ([3, 1, 2]), ([5, 4])) AS t(vals) -), -unnested AS ( - SELECT - row_idx, - unnest(vals) AS val - FROM indexed -), -ranked AS ( +EXPLAIN WITH unnested AS ( SELECT row_idx, - ROW_NUMBER() OVER (PARTITION BY row_idx ORDER BY val) AS val_idx, - val - FROM unnested + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) ) SELECT row_idx, array_agg(val ORDER BY val_idx) AS vals -FROM ranked +FROM unnested GROUP BY row_idx ORDER BY row_idx; ---- logical_plan -01)Sort: ranked.row_idx ASC NULLS LAST -02)--Projection: ranked.row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST] AS vals -03)----Aggregate: groupBy=[[ranked.row_idx]], aggr=[[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]]] -04)------SubqueryAlias: ranked -05)--------Projection: unnested.row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS val_idx, unnested.val -06)----------WindowAggr: windowExpr=[[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] -07)------------SubqueryAlias: unnested -08)--------------Projection: indexed.row_idx, __unnest_placeholder(indexed.vals,depth=1) AS UNNEST(indexed.vals) AS val -09)----------------Unnest: lists[__unnest_placeholder(indexed.vals)|depth=1] structs[] -10)------------------Projection: indexed.row_idx, indexed.vals AS __unnest_placeholder(indexed.vals) -11)--------------------SubqueryAlias: indexed -12)----------------------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS row_idx, t.vals -13)------------------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] -14)--------------------------SubqueryAlias: t -15)----------------------------Projection: column1 AS vals -16)------------------------------Values: (List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (List([5, 4]) AS make_array(Int64(5),Int64(4))) +01)Sort: unnested.row_idx ASC NULLS LAST +02)--Projection: unnested.row_idx, array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST] AS vals +03)----Aggregate: groupBy=[[unnested.row_idx]], aggr=[[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]]] +04)------SubqueryAlias: unnested +05)--------Projection: t.row_idx, __unnest_placeholder(t.vals,depth=1) AS UNNEST(t.vals) AS val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1) AS UNNEST(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) AS val_idx +06)----------Unnest: lists[__unnest_placeholder(t.vals)|depth=1, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))|depth=1] structs[] +07)------------Projection: t.row_idx, t.vals AS __unnest_placeholder(t.vals), range(Int64(1), CAST(CAST(cardinality(t.vals) AS Decimal128(20, 0)) + Decimal128(Some(1),20,0) AS Int64)) AS __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) +08)--------------SubqueryAlias: t +09)----------------Projection: column1 AS row_idx, column2 AS vals +10)------------------Values: (Int64(1), List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (Int64(2), List([5, 4]) AS make_array(Int64(5),Int64(4))) physical_plan 01)SortPreservingMergeExec: [row_idx@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]@1 as vals] -03)----AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]], ordering_mode=Sorted -04)------ProjectionExec: expr=[row_idx@0 as row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as val_idx, val@1 as val] -05)--------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -06)----------SortExec: expr=[row_idx@0 ASC NULLS LAST, val@1 ASC NULLS LAST], preserve_partitioning=[true] -07)------------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=1, maintains_sort_order=true -08)--------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(indexed.vals,depth=1)@1 as val] -09)----------------UnnestExec -10)------------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as row_idx, vals@0 as __unnest_placeholder(indexed.vals)] -11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] -12)----------------------ProjectionExec: expr=[column1@0 as vals] -13)------------------------DataSourceExec: partitions=1, partition_sizes=[1] +02)--SortExec: expr=[row_idx@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]@1 as vals] +04)------AggregateExec: mode=FinalPartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]] +05)--------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[row_idx@0 as row_idx], aggr=[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]] +07)------------SortExec: expr=[val_idx@2 ASC NULLS LAST], preserve_partitioning=[true] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(t.vals,depth=1)@1 as val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1)@2 as val_idx] +10)------------------UnnestExec +11)--------------------ProjectionExec: expr=[column1@0 as row_idx, column2@1 as __unnest_placeholder(t.vals), range(1, CAST(CAST(cardinality(column2@1) AS Decimal128(20, 0)) + Some(1),20,0 AS Int64)) as __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))] +12)----------------------DataSourceExec: partitions=1, partition_sizes=[1] + +query I? +WITH unnested AS ( + SELECT + row_idx, + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) +) +SELECT + row_idx, + array_agg(val ORDER BY val_idx) AS vals +FROM unnested +GROUP BY row_idx +ORDER BY row_idx; +---- +1 [3, 1, 2] +2 [5, 4] # Unnest array where data is already ordered by column2 (100, 200, 300, 400) statement ok From 9cdc6e53755f6a273769ccf78e76a937c8b3424d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 20:13:36 +0800 Subject: [PATCH 5/8] Restore original shape in regression with corrections Reintroduce explicit row ids, unnest, and generated original element ordinals. Implement ROW_NUMBER() over partitioned indices for accurate ordering. Adjust Rust-side plan assertion for improved stability by focusing on higher-signal operators and sort shape instead of exact normalized casts. --- datafusion/core/tests/sql/aggregates/basic.rs | 19 ++++- datafusion/sqllogictest/test_files/unnest.slt | 76 ++++++++++++------- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index 3e7b04351fe0..10ff98a8d10c 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -79,17 +79,27 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { SELECT row_idx, unnest(vals) AS val, - unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx FROM ( VALUES (1, [3, 1, 2]), (2, [5, 4]) ) AS t(row_idx, vals) + ), + ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested ) SELECT row_idx, array_agg(val ORDER BY val_idx) AS vals - FROM unnested + FROM ranked GROUP BY row_idx ORDER BY row_idx "#; @@ -109,8 +119,9 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { for needle in [ "UnnestExec", - "aggr=[array_agg(unnested.val) ORDER BY [", - "range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8(\"Int64\")))", + "BoundedWindowAggExec", + "aggr=[array_agg(ranked.val) ORDER BY [", + "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", ] { assert_contains!(&formatted, needle); } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 528b0b624a48..f4f67f380384 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -1000,68 +1000,90 @@ physical_plan # adding ORDER BY to array_agg exercises the order-sensitive accumulator path # rather than the compact GroupsAccumulator fast path used by unordered array_agg. # This variant also uses an explicit row id plus a generated element ordinal, -# so it validates rebuilding the original list order after unnest instead of -# merely sorting by value. +# then computes row_number over that ordinal so the explain plan still pins the +# windowed execution shape from the issue while validating original list order. query TT EXPLAIN WITH unnested AS ( SELECT row_idx, unnest(vals) AS val, - unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx FROM ( VALUES (1, [3, 1, 2]), (2, [5, 4]) ) AS t(row_idx, vals) +), +ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested ) SELECT row_idx, array_agg(val ORDER BY val_idx) AS vals -FROM unnested +FROM ranked GROUP BY row_idx ORDER BY row_idx; ---- logical_plan -01)Sort: unnested.row_idx ASC NULLS LAST -02)--Projection: unnested.row_idx, array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST] AS vals -03)----Aggregate: groupBy=[[unnested.row_idx]], aggr=[[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]]] -04)------SubqueryAlias: unnested -05)--------Projection: t.row_idx, __unnest_placeholder(t.vals,depth=1) AS UNNEST(t.vals) AS val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1) AS UNNEST(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) AS val_idx -06)----------Unnest: lists[__unnest_placeholder(t.vals)|depth=1, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))|depth=1] structs[] -07)------------Projection: t.row_idx, t.vals AS __unnest_placeholder(t.vals), range(Int64(1), CAST(CAST(cardinality(t.vals) AS Decimal128(20, 0)) + Decimal128(Some(1),20,0) AS Int64)) AS __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) -08)--------------SubqueryAlias: t -09)----------------Projection: column1 AS row_idx, column2 AS vals -10)------------------Values: (Int64(1), List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (Int64(2), List([5, 4]) AS make_array(Int64(5),Int64(4))) +01)Sort: ranked.row_idx ASC NULLS LAST +02)--Projection: ranked.row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST] AS vals +03)----Aggregate: groupBy=[[ranked.row_idx]], aggr=[[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]]] +04)------SubqueryAlias: ranked +05)--------Projection: unnested.row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS val_idx, unnested.val +06)----------WindowAggr: windowExpr=[[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +07)------------SubqueryAlias: unnested +08)--------------Projection: t.row_idx, __unnest_placeholder(t.vals,depth=1) AS UNNEST(t.vals) AS val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1) AS UNNEST(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) AS original_idx +09)----------------Unnest: lists[__unnest_placeholder(t.vals)|depth=1, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))|depth=1] structs[] +10)------------------Projection: t.row_idx, t.vals AS __unnest_placeholder(t.vals), range(Int64(1), CAST(CAST(cardinality(t.vals) AS Decimal128(20, 0)) + Decimal128(Some(1),20,0) AS Int64)) AS __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) +11)--------------------SubqueryAlias: t +12)----------------------Projection: column1 AS row_idx, column2 AS vals +13)------------------------Values: (Int64(1), List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (Int64(2), List([5, 4]) AS make_array(Int64(5),Int64(4))) physical_plan 01)SortPreservingMergeExec: [row_idx@0 ASC NULLS LAST] -02)--SortExec: expr=[row_idx@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]@1 as vals] -04)------AggregateExec: mode=FinalPartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]] -05)--------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[row_idx@0 as row_idx], aggr=[array_agg(unnested.val) ORDER BY [unnested.val_idx ASC NULLS LAST]] -07)------------SortExec: expr=[val_idx@2 ASC NULLS LAST], preserve_partitioning=[true] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(t.vals,depth=1)@1 as val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1)@2 as val_idx] -10)------------------UnnestExec -11)--------------------ProjectionExec: expr=[column1@0 as row_idx, column2@1 as __unnest_placeholder(t.vals), range(1, CAST(CAST(cardinality(column2@1) AS Decimal128(20, 0)) + Some(1),20,0 AS Int64)) as __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))] -12)----------------------DataSourceExec: partitions=1, partition_sizes=[1] +02)--ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]@1 as vals] +03)----AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]], ordering_mode=Sorted +04)------ProjectionExec: expr=[row_idx@0 as row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as val_idx, val@1 as val] +05)--------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +06)----------SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST], preserve_partitioning=[true] +07)------------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=1 +08)--------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(t.vals,depth=1)@1 as val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1)@2 as original_idx] +09)----------------UnnestExec +10)------------------ProjectionExec: expr=[column1@0 as row_idx, column2@1 as __unnest_placeholder(t.vals), range(1, CAST(CAST(cardinality(column2@1) AS Decimal128(20, 0)) + Some(1),20,0 AS Int64)) as __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))] +11)--------------------DataSourceExec: partitions=1, partition_sizes=[1] query I? WITH unnested AS ( SELECT row_idx, unnest(vals) AS val, - unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS val_idx + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx FROM ( VALUES (1, [3, 1, 2]), (2, [5, 4]) ) AS t(row_idx, vals) +), +ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested ) SELECT row_idx, array_agg(val ORDER BY val_idx) AS vals -FROM unnested +FROM ranked GROUP BY row_idx ORDER BY row_idx; ---- From fcdf95f0d7c4f7f9003880cd68ae5cc0161d58e0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 20:22:20 +0800 Subject: [PATCH 6/8] Improve array_agg tests and assertions Extract shared array_agg_after_unnest_sql helper. Enhance ordered_array_agg tests to compare plan shapes. Add ordered_array_agg_after_unnest_explain_analyze_metrics to verify runtime metrics and memory usage. Adjust unordered test assertions to reflect optimizer behavior and demonstrate efficiency of unordered aggregation path. --- datafusion/core/tests/sql/aggregates/basic.rs | 129 +++++++++++++----- 1 file changed, 98 insertions(+), 31 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index 10ff98a8d10c..8f12a3e52aef 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -21,6 +21,46 @@ use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; use insta::assert_snapshot; +fn array_agg_after_unnest_sql(order_by_val_idx: bool) -> String { + let array_agg = if order_by_val_idx { + "array_agg(val ORDER BY val_idx) AS vals" + } else { + "array_agg(val) AS vals" + }; + + format!( + r#" + WITH unnested AS ( + SELECT + row_idx, + unnest(vals) AS val, + unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx + FROM ( + VALUES + (1, [3, 1, 2]), + (2, [5, 4]) + ) AS t(row_idx, vals) + ), + ranked AS ( + SELECT + row_idx, + ROW_NUMBER() OVER ( + PARTITION BY row_idx + ORDER BY original_idx + ) AS val_idx, + val + FROM unnested + ) + SELECT + row_idx, + {array_agg} + FROM ranked + GROUP BY row_idx + ORDER BY row_idx + "# + ) +} + #[tokio::test] async fn csv_query_array_agg_distinct() -> Result<()> { let ctx = SessionContext::new(); @@ -74,37 +114,10 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { let ctx = SessionContext::new_with_config( SessionConfig::new().with_target_partitions(4), ); - let sql = r#" - WITH unnested AS ( - SELECT - row_idx, - unnest(vals) AS val, - unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx - FROM ( - VALUES - (1, [3, 1, 2]), - (2, [5, 4]) - ) AS t(row_idx, vals) - ), - ranked AS ( - SELECT - row_idx, - ROW_NUMBER() OVER ( - PARTITION BY row_idx - ORDER BY original_idx - ) AS val_idx, - val - FROM unnested - ) - SELECT - row_idx, - array_agg(val ORDER BY val_idx) AS vals - FROM ranked - GROUP BY row_idx - ORDER BY row_idx - "#; + let sql = array_agg_after_unnest_sql(true); + let unordered_sql = array_agg_after_unnest_sql(false); - let results = execute_to_batches(&ctx, sql).await; + let results = execute_to_batches(&ctx, &sql).await; assert_snapshot!(batches_to_sort_string(&results), @r" +---------+-----------+ | row_idx | vals | @@ -114,17 +127,71 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { +---------+-----------+ "); - let physical_plan = ctx.sql(sql).await?.create_physical_plan().await?; + let physical_plan = ctx.sql(&sql).await?.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); + let unordered_physical_plan = + ctx.sql(&unordered_sql).await?.create_physical_plan().await?; + let unordered_formatted = + displayable(unordered_physical_plan.as_ref()).indent(true).to_string(); for needle in [ "UnnestExec", "BoundedWindowAggExec", + "AggregateExec: mode=SinglePartitioned", "aggr=[array_agg(ranked.val) ORDER BY [", "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", ] { assert_contains!(&formatted, needle); } + assert_not_contains!(&formatted, "AggregateExec: mode=Partial"); + assert_not_contains!(&formatted, "AggregateExec: mode=FinalPartitioned"); + + for needle in [ + "UnnestExec", + "AggregateExec: mode=Partial", + "AggregateExec: mode=FinalPartitioned", + "aggr=[array_agg(ranked.val)]", + ] { + assert_contains!(&unordered_formatted, needle); + } + assert_not_contains!(&unordered_formatted, "ORDER BY [ranked.val_idx"); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> { + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(4), + ); + let sql = format!("EXPLAIN ANALYZE {}", array_agg_after_unnest_sql(true)); + let actual = execute_to_batches(&ctx, &sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual)? + .to_string(); + + assert_metrics!( + &formatted, + "UnnestExec", + "metrics=[output_rows=5", + "output_batches=1" + ); + assert_metrics!( + &formatted, + "BoundedWindowAggExec", + "metrics=[output_rows=5" + ); + assert_metrics!( + &formatted, + "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", + "metrics=[output_rows=5" + ); + assert_metrics!( + &formatted, + "AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx]", + "metrics=[output_rows=2", + "peak_mem_used=" + ); Ok(()) } From 842734b6c092277582b9f50d7a0e28f0c8ef287a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 21:48:55 +0800 Subject: [PATCH 7/8] feat: refactor array_agg_after_unnest_sql function to use enum for order handling - Updated the `array_agg_after_unnest_sql` function to accept an `ArrayAggOrder` enum instead of a boolean to improve readability. - Enhanced code clarity by explicitly defining ordered and unordered aggregation cases. - Updated usage and related SQL execution calls in tests for better consistency. --- datafusion/core/tests/sql/aggregates/basic.rs | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index 8f12a3e52aef..b9f3cc961f66 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -21,11 +21,17 @@ use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; use insta::assert_snapshot; -fn array_agg_after_unnest_sql(order_by_val_idx: bool) -> String { - let array_agg = if order_by_val_idx { +enum ArrayAggOrder { + OrderedByValIdx, + Unordered, +} + +fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String { + let array_agg = match order { + ArrayAggOrder::OrderedByValIdx => { "array_agg(val ORDER BY val_idx) AS vals" - } else { - "array_agg(val) AS vals" + } + ArrayAggOrder::Unordered => "array_agg(val) AS vals", }; format!( @@ -114,8 +120,8 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { let ctx = SessionContext::new_with_config( SessionConfig::new().with_target_partitions(4), ); - let sql = array_agg_after_unnest_sql(true); - let unordered_sql = array_agg_after_unnest_sql(false); + let sql = array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx); + let unordered_sql = array_agg_after_unnest_sql(ArrayAggOrder::Unordered); let results = execute_to_batches(&ctx, &sql).await; assert_snapshot!(batches_to_sort_string(&results), @r" @@ -146,6 +152,8 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { assert_not_contains!(&formatted, "AggregateExec: mode=Partial"); assert_not_contains!(&formatted, "AggregateExec: mode=FinalPartitioned"); + // The unordered branch can use the compact GroupsAccumulator path, which also + // means `val_idx` is dead and the optimizer prunes the window entirely. for needle in [ "UnnestExec", "AggregateExec: mode=Partial", @@ -165,7 +173,10 @@ async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> let ctx = SessionContext::new_with_config( SessionConfig::new().with_target_partitions(4), ); - let sql = format!("EXPLAIN ANALYZE {}", array_agg_after_unnest_sql(true)); + let sql = format!( + "EXPLAIN ANALYZE {}", + array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx) + ); let actual = execute_to_batches(&ctx, &sql).await; let formatted = arrow::util::pretty::pretty_format_batches(&actual)? .to_string(); From 329c0c01809e1d47cc826c271fa127ea5916b0d6 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 22:29:14 +0800 Subject: [PATCH 8/8] Refactor regression tests and shared helpers Move ArrayAggOrder SQL fragment selection to enum method. Extract common functionality for SessionContext, EXPLAIN ANALYZE wrapping, physical-plan generation, and grouped substring assertions. Simplify regression tests by removing repeated plan-building and assertion boilerplate. --- datafusion/core/tests/sql/aggregates/basic.rs | 115 ++++++++++-------- 1 file changed, 65 insertions(+), 50 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index b9f3cc961f66..65d092fc96ae 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -21,19 +21,22 @@ use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; use insta::assert_snapshot; +#[derive(Clone, Copy)] enum ArrayAggOrder { OrderedByValIdx, Unordered, } -fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String { - let array_agg = match order { - ArrayAggOrder::OrderedByValIdx => { - "array_agg(val ORDER BY val_idx) AS vals" +impl ArrayAggOrder { + fn array_agg_expr(self) -> &'static str { + match self { + Self::OrderedByValIdx => "array_agg(val ORDER BY val_idx) AS vals", + Self::Unordered => "array_agg(val) AS vals", } - ArrayAggOrder::Unordered => "array_agg(val) AS vals", - }; + } +} +fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String { format!( r#" WITH unnested AS ( @@ -63,10 +66,33 @@ fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String { FROM ranked GROUP BY row_idx ORDER BY row_idx - "# + "#, + array_agg = order.array_agg_expr(), ) } +fn regression_ctx() -> SessionContext { + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)) +} + +fn explain_analyze_sql(sql: &str) -> String { + format!("EXPLAIN ANALYZE {sql}") +} + +async fn formatted_physical_plan(ctx: &SessionContext, sql: &str) -> Result { + let physical_plan = ctx.sql(sql).await?.create_physical_plan().await?; + Ok(displayable(physical_plan.as_ref()).indent(true).to_string()) +} + +fn assert_plan_contains(formatted: &str, required: &[&str], forbidden: &[&str]) { + for needle in required { + assert_contains!(formatted, *needle); + } + for needle in forbidden { + assert_not_contains!(formatted, *needle); + } +} + #[tokio::test] async fn csv_query_array_agg_distinct() -> Result<()> { let ctx = SessionContext::new(); @@ -117,9 +143,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { #[tokio::test] async fn ordered_array_agg_after_unnest_regression() -> Result<()> { - let ctx = SessionContext::new_with_config( - SessionConfig::new().with_target_partitions(4), - ); + let ctx = regression_ctx(); let sql = array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx); let unordered_sql = array_agg_after_unnest_sql(ArrayAggOrder::Unordered); @@ -133,36 +157,36 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { +---------+-----------+ "); - let physical_plan = ctx.sql(&sql).await?.create_physical_plan().await?; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let unordered_physical_plan = - ctx.sql(&unordered_sql).await?.create_physical_plan().await?; - let unordered_formatted = - displayable(unordered_physical_plan.as_ref()).indent(true).to_string(); + let formatted = formatted_physical_plan(&ctx, &sql).await?; + let unordered_formatted = formatted_physical_plan(&ctx, &unordered_sql).await?; - for needle in [ - "UnnestExec", - "BoundedWindowAggExec", - "AggregateExec: mode=SinglePartitioned", - "aggr=[array_agg(ranked.val) ORDER BY [", - "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", - ] { - assert_contains!(&formatted, needle); - } - assert_not_contains!(&formatted, "AggregateExec: mode=Partial"); - assert_not_contains!(&formatted, "AggregateExec: mode=FinalPartitioned"); + assert_plan_contains( + &formatted, + &[ + "UnnestExec", + "BoundedWindowAggExec", + "AggregateExec: mode=SinglePartitioned", + "aggr=[array_agg(ranked.val) ORDER BY [", + "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]", + ], + &[ + "AggregateExec: mode=Partial", + "AggregateExec: mode=FinalPartitioned", + ], + ); // The unordered branch can use the compact GroupsAccumulator path, which also // means `val_idx` is dead and the optimizer prunes the window entirely. - for needle in [ - "UnnestExec", - "AggregateExec: mode=Partial", - "AggregateExec: mode=FinalPartitioned", - "aggr=[array_agg(ranked.val)]", - ] { - assert_contains!(&unordered_formatted, needle); - } - assert_not_contains!(&unordered_formatted, "ORDER BY [ranked.val_idx"); + assert_plan_contains( + &unordered_formatted, + &[ + "UnnestExec", + "AggregateExec: mode=Partial", + "AggregateExec: mode=FinalPartitioned", + "aggr=[array_agg(ranked.val)]", + ], + &["ORDER BY [ranked.val_idx"], + ); Ok(()) } @@ -170,16 +194,11 @@ async fn ordered_array_agg_after_unnest_regression() -> Result<()> { #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> { - let ctx = SessionContext::new_with_config( - SessionConfig::new().with_target_partitions(4), - ); - let sql = format!( - "EXPLAIN ANALYZE {}", - array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx) - ); + let ctx = regression_ctx(); + let sql = + explain_analyze_sql(&array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx)); let actual = execute_to_batches(&ctx, &sql).await; - let formatted = arrow::util::pretty::pretty_format_batches(&actual)? - .to_string(); + let formatted = arrow::util::pretty::pretty_format_batches(&actual)?.to_string(); assert_metrics!( &formatted, @@ -187,11 +206,7 @@ async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> "metrics=[output_rows=5", "output_batches=1" ); - assert_metrics!( - &formatted, - "BoundedWindowAggExec", - "metrics=[output_rows=5" - ); + assert_metrics!(&formatted, "BoundedWindowAggExec", "metrics=[output_rows=5"); assert_metrics!( &formatted, "SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]",