Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions datafusion/core/tests/sql/aggregates/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,78 @@ use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
use insta::assert_snapshot;

#[derive(Clone, Copy)]
enum ArrayAggOrder {
OrderedByValIdx,
Unordered,
}

impl ArrayAggOrder {
fn array_agg_expr(self) -> &'static str {
match self {
Self::OrderedByValIdx => "array_agg(val ORDER BY val_idx) AS vals",
Self::Unordered => "array_agg(val) AS vals",
}
}
}

fn array_agg_after_unnest_sql(order: ArrayAggOrder) -> String {
format!(
r#"
WITH unnested AS (
SELECT
row_idx,
unnest(vals) AS val,
unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx
FROM (
VALUES
(1, [3, 1, 2]),
(2, [5, 4])
) AS t(row_idx, vals)
),
ranked AS (
SELECT
row_idx,
ROW_NUMBER() OVER (
PARTITION BY row_idx
ORDER BY original_idx
) AS val_idx,
val
FROM unnested
)
SELECT
row_idx,
{array_agg}
FROM ranked
GROUP BY row_idx
ORDER BY row_idx
"#,
array_agg = order.array_agg_expr(),
)
}

fn regression_ctx() -> SessionContext {
SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4))
}

fn explain_analyze_sql(sql: &str) -> String {
format!("EXPLAIN ANALYZE {sql}")
}

async fn formatted_physical_plan(ctx: &SessionContext, sql: &str) -> Result<String> {
let physical_plan = ctx.sql(sql).await?.create_physical_plan().await?;
Ok(displayable(physical_plan.as_ref()).indent(true).to_string())
}

fn assert_plan_contains(formatted: &str, required: &[&str], forbidden: &[&str]) {
for needle in required {
assert_contains!(formatted, *needle);
}
for needle in forbidden {
assert_not_contains!(formatted, *needle);
}
}

#[tokio::test]
async fn csv_query_array_agg_distinct() -> Result<()> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -69,6 +141,87 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn ordered_array_agg_after_unnest_regression() -> Result<()> {
let ctx = regression_ctx();
let sql = array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx);
let unordered_sql = array_agg_after_unnest_sql(ArrayAggOrder::Unordered);

let results = execute_to_batches(&ctx, &sql).await;
assert_snapshot!(batches_to_sort_string(&results), @r"
+---------+-----------+
| row_idx | vals |
+---------+-----------+
| 1 | [3, 1, 2] |
| 2 | [5, 4] |
+---------+-----------+
");

let formatted = formatted_physical_plan(&ctx, &sql).await?;
let unordered_formatted = formatted_physical_plan(&ctx, &unordered_sql).await?;

assert_plan_contains(
&formatted,
&[
"UnnestExec",
"BoundedWindowAggExec",
"AggregateExec: mode=SinglePartitioned",
"aggr=[array_agg(ranked.val) ORDER BY [",
"SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]",
],
&[
"AggregateExec: mode=Partial",
"AggregateExec: mode=FinalPartitioned",
],
);

// The unordered branch can use the compact GroupsAccumulator path, which also
// means `val_idx` is dead and the optimizer prunes the window entirely.
assert_plan_contains(
&unordered_formatted,
&[
"UnnestExec",
"AggregateExec: mode=Partial",
"AggregateExec: mode=FinalPartitioned",
"aggr=[array_agg(ranked.val)]",
],
&["ORDER BY [ranked.val_idx"],
);

Ok(())
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn ordered_array_agg_after_unnest_explain_analyze_metrics() -> Result<()> {
let ctx = regression_ctx();
let sql =
explain_analyze_sql(&array_agg_after_unnest_sql(ArrayAggOrder::OrderedByValIdx));
let actual = execute_to_batches(&ctx, &sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)?.to_string();

assert_metrics!(
&formatted,
"UnnestExec",
"metrics=[output_rows=5",
"output_batches=1"
);
assert_metrics!(&formatted, "BoundedWindowAggExec", "metrics=[output_rows=5");
assert_metrics!(
&formatted,
"SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST]",
"metrics=[output_rows=5"
);
assert_metrics!(
&formatted,
"AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx]",
"metrics=[output_rows=2",
"peak_mem_used="
);

Ok(())
}

#[tokio::test]
async fn count_partitioned() -> Result<()> {
let results =
Expand Down
94 changes: 94 additions & 0 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,100 @@ physical_plan
09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

# The unordered case above is not sufficient coverage for issue 20788:
# adding ORDER BY to array_agg exercises the order-sensitive accumulator path
# rather than the compact GroupsAccumulator fast path used by unordered array_agg.
# This variant also uses an explicit row id plus a generated element ordinal,
# then computes row_number over that ordinal so the explain plan still pins the
# windowed execution shape from the issue while validating original list order.
query TT
EXPLAIN WITH unnested AS (
SELECT
row_idx,
unnest(vals) AS val,
unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx
FROM (
VALUES
(1, [3, 1, 2]),
(2, [5, 4])
) AS t(row_idx, vals)
),
ranked AS (
SELECT
row_idx,
ROW_NUMBER() OVER (
PARTITION BY row_idx
ORDER BY original_idx
) AS val_idx,
val
FROM unnested
)
SELECT
row_idx,
array_agg(val ORDER BY val_idx) AS vals
FROM ranked
GROUP BY row_idx
ORDER BY row_idx;
----
logical_plan
01)Sort: ranked.row_idx ASC NULLS LAST
02)--Projection: ranked.row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST] AS vals
03)----Aggregate: groupBy=[[ranked.row_idx]], aggr=[[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]]]
04)------SubqueryAlias: ranked
05)--------Projection: unnested.row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS val_idx, unnested.val
06)----------WindowAggr: windowExpr=[[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
07)------------SubqueryAlias: unnested
08)--------------Projection: t.row_idx, __unnest_placeholder(t.vals,depth=1) AS UNNEST(t.vals) AS val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1) AS UNNEST(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64")))) AS original_idx
09)----------------Unnest: lists[__unnest_placeholder(t.vals)|depth=1, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))|depth=1] structs[]
10)------------------Projection: t.row_idx, t.vals AS __unnest_placeholder(t.vals), range(Int64(1), CAST(CAST(cardinality(t.vals) AS Decimal128(20, 0)) + Decimal128(Some(1),20,0) AS Int64)) AS __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))
11)--------------------SubqueryAlias: t
12)----------------------Projection: column1 AS row_idx, column2 AS vals
13)------------------------Values: (Int64(1), List([3, 1, 2]) AS make_array(Int64(3),Int64(1),Int64(2))), (Int64(2), List([5, 4]) AS make_array(Int64(5),Int64(4)))
physical_plan
01)SortPreservingMergeExec: [row_idx@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[row_idx@0 as row_idx, array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]@1 as vals]
03)----AggregateExec: mode=SinglePartitioned, gby=[row_idx@0 as row_idx], aggr=[array_agg(ranked.val) ORDER BY [ranked.val_idx ASC NULLS LAST]], ordering_mode=Sorted
04)------ProjectionExec: expr=[row_idx@0 as row_idx, row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as val_idx, val@1 as val]
05)--------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [unnested.row_idx] ORDER BY [unnested.original_idx ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
06)----------SortExec: expr=[row_idx@0 ASC NULLS LAST, original_idx@2 ASC NULLS LAST], preserve_partitioning=[true]
07)------------RepartitionExec: partitioning=Hash([row_idx@0], 4), input_partitions=1
08)--------------ProjectionExec: expr=[row_idx@0 as row_idx, __unnest_placeholder(t.vals,depth=1)@1 as val, __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))),depth=1)@2 as original_idx]
09)----------------UnnestExec
10)------------------ProjectionExec: expr=[column1@0 as row_idx, column2@1 as __unnest_placeholder(t.vals), range(1, CAST(CAST(cardinality(column2@1) AS Decimal128(20, 0)) + Some(1),20,0 AS Int64)) as __unnest_placeholder(range(Int64(1),arrow_cast(cardinality(t.vals) + Int64(1),Utf8("Int64"))))]
11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]

query I?
WITH unnested AS (
SELECT
row_idx,
unnest(vals) AS val,
unnest(range(1, arrow_cast(cardinality(vals) + 1, 'Int64'))) AS original_idx
FROM (
VALUES
(1, [3, 1, 2]),
(2, [5, 4])
) AS t(row_idx, vals)
),
ranked AS (
SELECT
row_idx,
ROW_NUMBER() OVER (
PARTITION BY row_idx
ORDER BY original_idx
) AS val_idx,
val
FROM unnested
)
SELECT
row_idx,
array_agg(val ORDER BY val_idx) AS vals
FROM ranked
GROUP BY row_idx
ORDER BY row_idx;
----
1 [3, 1, 2]
2 [5, 4]

# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
statement ok
COPY (
Expand Down
Loading