From 97beaa1760208f8399b2ab40427f2fc478a1a803 Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 8 Apr 2026 21:10:53 +0100 Subject: [PATCH 1/6] Remove noisy benchmarks for first_last --- .../functions-aggregate/benches/first_last.rs | 99 +------------------ 1 file changed, 1 insertion(+), 98 deletions(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index 6070470138aa..be8622e0bdb4 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -24,7 +24,7 @@ use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, + AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, }; use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; use datafusion_physical_expr::PhysicalSortExpr; @@ -72,86 +72,6 @@ fn prepare_groups_accumulator(is_first: bool) -> Box { } } -fn prepare_accumulator(is_first: bool) -> Box { - let schema = Arc::new(Schema::new(vec![ - Field::new("value", DataType::Int64, true), - Field::new("ord", DataType::Int64, true), - ])); - - let order_expr = col("ord", &schema).unwrap(); - let sort_expr = PhysicalSortExpr { - expr: order_expr, - options: SortOptions::default(), - }; - - let value_field: Arc = Field::new("value", DataType::Int64, true).into(); - let accumulator_args = AccumulatorArgs { - return_field: Arc::clone(&value_field), - schema: &schema, - expr_fields: &[value_field], - ignore_nulls: false, - order_bys: std::slice::from_ref(&sort_expr), - is_reversed: false, - name: if is_first { - "FIRST_VALUE(value ORDER BY ord)" - } else { - "LAST_VALUE(value ORDER BY ord)" - }, - is_distinct: false, - exprs: &[col("value", &schema).unwrap()], - }; - - if is_first { - FirstValue::new().accumulator(accumulator_args).unwrap() - } else { - LastValue::new().accumulator(accumulator_args).unwrap() - } -} - -#[expect(clippy::needless_pass_by_value)] -fn convert_to_state_bench( - c: &mut Criterion, - is_first: bool, - name: &str, - values: ArrayRef, - opt_filter: Option<&BooleanArray>, -) { - c.bench_function(name, |b| { - b.iter(|| { - let accumulator = prepare_groups_accumulator(is_first); - black_box( - accumulator - .convert_to_state(std::slice::from_ref(&values), opt_filter) - .unwrap(), - ) - }) - }); -} - -#[expect(clippy::needless_pass_by_value)] -fn evaluate_accumulator_bench( - c: &mut Criterion, - is_first: bool, - name: &str, - values: ArrayRef, - ord: ArrayRef, -) { - c.bench_function(name, |b| { - b.iter_batched( - || { - // setup, not timed - let mut accumulator = prepare_accumulator(is_first); - accumulator - .update_batch(&[Arc::clone(&values), Arc::clone(&ord)]) - .unwrap(); - accumulator - }, - |mut accumulator| black_box(accumulator.evaluate().unwrap()), - criterion::BatchSize::SmallInput, - ) - }); -} - #[expect(clippy::needless_pass_by_value)] #[expect(clippy::too_many_arguments)] fn evaluate_bench( @@ -208,27 +128,10 @@ fn first_last_benchmark(c: &mut Criterion) { let ord = Arc::new(create_primitive_array::(N, null_density)) as ArrayRef; - evaluate_accumulator_bench( - c, - is_first, - &format!("{fn_name} evaluate_accumulator_bench nulls={pct}%"), - values.clone(), - ord.clone(), - ); - for with_filter in [false, true] { let filter = create_boolean_array(N, 0.0, 0.5); let opt_filter = if with_filter { Some(&filter) } else { None }; - convert_to_state_bench( - c, - is_first, - &format!( - "{fn_name} convert_to_state nulls={pct}%, filter={with_filter}" - ), - values.clone(), - opt_filter, - ); evaluate_bench( c, is_first, From fbef4057621679fcfa2faedc0400dc27ec2865a3 Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 8 Apr 2026 21:33:35 +0100 Subject: [PATCH 2/6] Measure using iter_custom --- .../functions-aggregate/benches/first_last.rs | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index be8622e0bdb4..e5928fea3813 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::hint::black_box; -use std::sync::Arc; - use arrow::array::{ArrayRef, BooleanArray}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; +use std::hint::black_box; +use std::sync::Arc; +use std::time::Instant; use datafusion_expr::{ AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, @@ -88,23 +88,29 @@ fn evaluate_bench( let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); c.bench_function(name, |b| { - b.iter_batched( - || { - // setup, not timed - let mut accumulator = prepare_groups_accumulator(is_first); - accumulator - .update_batch( - &[Arc::clone(&values), Arc::clone(&ord)], - &group_indices, - opt_filter, - num_groups, - ) - .unwrap(); - accumulator - }, - |mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()), - criterion::BatchSize::SmallInput, - ) + b.iter_custom(|iters| { + // Every `evaluate` call mutates the accumulator, so prebuild `iters` accumulators + let mut accumulators: Vec> = (0..iters) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(); + accumulator + }) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + black_box(accumulator.evaluate(emit_to).unwrap()); + } + start.elapsed() + }) }); } From f6216a3e4dab5bc18c56a9192dd648f20fb604b1 Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 8 Apr 2026 22:01:39 +0100 Subject: [PATCH 3/6] Add benches for update_batch, merge_batch --- .../functions-aggregate/benches/first_last.rs | 115 +++++++++++++++++- 1 file changed, 114 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index e5928fea3813..c7207a6007d1 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -19,9 +19,9 @@ use arrow::array::{ArrayRef, BooleanArray}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; +use datafusion_common::instant::Instant; use std::hint::black_box; use std::sync::Arc; -use std::time::Instant; use datafusion_expr::{ AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, @@ -114,6 +114,96 @@ fn evaluate_bench( }); } +#[expect(clippy::needless_pass_by_value)] +fn update_bench( + c: &mut Criterion, + is_first: bool, + name: &str, + values: ArrayRef, + ord: ArrayRef, + opt_filter: Option<&BooleanArray>, + num_groups: usize, +) { + let n = values.len(); + let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + // Every `update_bench` call mutates the accumulator, so prebuild `iters` accumulators + let mut accumulators: Vec> = (0..iters) + .map(|_| prepare_groups_accumulator(is_first)) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + #[expect(clippy::unit_arg)] + black_box( + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(), + ); + } + start.elapsed() + }) + }); +} + +#[expect(clippy::needless_pass_by_value)] +fn merge_bench( + c: &mut Criterion, + is_first: bool, + name: &str, + values: ArrayRef, + ord: ArrayRef, + opt_filter: Option<&BooleanArray>, + num_groups: usize, +) { + let n = values.len(); + let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + let is_set: ArrayRef = Arc::new(BooleanArray::from(vec![true; n])); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + // Every `merge_batch` call mutates the accumulator, so prebuild `iters` accumulators + let mut accumulators: Vec> = (0..iters) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(); + accumulator + }) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + #[expect(clippy::unit_arg)] + black_box( + accumulator + .merge_batch( + &[Arc::clone(&values), Arc::clone(&ord), Arc::clone(&is_set)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(), + ); + } + start.elapsed() + }) + }); +} + fn first_last_benchmark(c: &mut Criterion) { const N: usize = 65536; const NUM_GROUPS: usize = 1024; @@ -162,6 +252,29 @@ fn first_last_benchmark(c: &mut Criterion) { opt_filter, NUM_GROUPS, ); + + update_bench( + c, + is_first, + &format!( + "{fn_name} update_bench nulls={pct}%, filter={with_filter}, first(2)" + ), + values.clone(), + ord.clone(), + opt_filter, + NUM_GROUPS, + ); + merge_bench( + c, + is_first, + &format!( + "{fn_name} merge_bench nulls={pct}%, filter={with_filter}, first(2)" + ), + values.clone(), + ord.clone(), + opt_filter, + NUM_GROUPS, + ); } } } From f1e327ea446f173c2f14e83fc3e764fc370d602e Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 8 Apr 2026 23:05:23 +0100 Subject: [PATCH 4/6] Prepopulate with worst-case ordering --- .../functions-aggregate/benches/first_last.rs | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index c7207a6007d1..cb4946342177 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray, Int64Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; @@ -127,11 +127,33 @@ fn update_bench( let n = values.len(); let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + // Initialize with worst-case ordering so update_batch forces rows comparison for all groups. + let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![ + if is_first { + i64::MAX + } else { + i64::MIN + }; + n + ])); + c.bench_function(name, |b| { b.iter_custom(|iters| { - // Every `update_bench` call mutates the accumulator, so prebuild `iters` accumulators + // Every `update_batch` call mutates the accumulator, so prebuild `iters` accumulators. + // Each is pre-populated with worst_ord so all is_sets=true before the timed call. let mut accumulators: Vec> = (0..iters) - .map(|_| prepare_groups_accumulator(is_first)) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&worst_ord)], + &group_indices, + None, // no filter: ensure all groups are initialised + num_groups, + ) + .unwrap(); + accumulator + }) .collect(); let start = Instant::now(); @@ -167,6 +189,16 @@ fn merge_bench( let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); let is_set: ArrayRef = Arc::new(BooleanArray::from(vec![true; n])); + // Initialize with worst-case ordering so update_batch forces rows comparison for all groups. + let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![ + if is_first { + i64::MAX + } else { + i64::MIN + }; + n + ])); + c.bench_function(name, |b| { b.iter_custom(|iters| { // Every `merge_batch` call mutates the accumulator, so prebuild `iters` accumulators @@ -175,7 +207,7 @@ fn merge_bench( let mut accumulator = prepare_groups_accumulator(is_first); accumulator .update_batch( - &[Arc::clone(&values), Arc::clone(&ord)], + &[Arc::clone(&values), Arc::clone(&worst_ord)], &group_indices, opt_filter, num_groups, From 5d98dc076076b7a9c03ee1eebf9f658404244331 Mon Sep 17 00:00:00 2001 From: theirix Date: Thu, 9 Apr 2026 21:28:30 +0100 Subject: [PATCH 5/6] Fixup bench name --- datafusion/functions-aggregate/benches/first_last.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index cb4946342177..1e73d9c0ca05 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -288,9 +288,7 @@ fn first_last_benchmark(c: &mut Criterion) { update_bench( c, is_first, - &format!( - "{fn_name} update_bench nulls={pct}%, filter={with_filter}, first(2)" - ), + &format!("{fn_name} update_bench nulls={pct}%, filter={with_filter}"), values.clone(), ord.clone(), opt_filter, @@ -299,9 +297,7 @@ fn first_last_benchmark(c: &mut Criterion) { merge_bench( c, is_first, - &format!( - "{fn_name} merge_bench nulls={pct}%, filter={with_filter}, first(2)" - ), + &format!("{fn_name} merge_bench nulls={pct}%, filter={with_filter}"), values.clone(), ord.clone(), opt_filter, From 4f29c615996d2a2a7864910f5b4b8fdd2a170d57 Mon Sep 17 00:00:00 2001 From: theirix Date: Thu, 9 Apr 2026 21:49:41 +0100 Subject: [PATCH 6/6] Add bench for Accumulator --- .../functions-aggregate/benches/first_last.rs | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index 1e73d9c0ca05..b3bcaa3746b9 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -24,9 +24,11 @@ use std::hint::black_box; use std::sync::Arc; use datafusion_expr::{ - AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, + Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, +}; +use datafusion_functions_aggregate::first_last::{ + FirstValue, LastValue, TrivialFirstValueAccumulator, TrivialLastValueAccumulator, }; -use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::expressions::col; @@ -72,6 +74,22 @@ fn prepare_groups_accumulator(is_first: bool) -> Box { } } +fn create_trivial_accumulator( + is_first: bool, + ignore_nulls: bool, +) -> Box { + if is_first { + Box::new( + TrivialFirstValueAccumulator::try_new(&DataType::Int64, ignore_nulls) + .unwrap(), + ) + } else { + Box::new( + TrivialLastValueAccumulator::try_new(&DataType::Int64, ignore_nulls).unwrap(), + ) + } +} + #[expect(clippy::needless_pass_by_value)] #[expect(clippy::too_many_arguments)] fn evaluate_bench( @@ -236,6 +254,30 @@ fn merge_bench( }); } +#[expect(clippy::needless_pass_by_value)] +fn trivial_update_bench( + c: &mut Criterion, + is_first: bool, + ignore_nulls: bool, + name: &str, + values: ArrayRef, +) { + c.bench_function(name, |b| { + b.iter_custom(|iters| { + // The bench is way too fast, so apply 10x factor + let mut accumulators: Vec> = (0..iters * 10) + .map(|_| create_trivial_accumulator(is_first, ignore_nulls)) + .collect(); + let start = Instant::now(); + for acc in &mut accumulators { + #[expect(clippy::unit_arg)] + black_box(acc.update_batch(&[Arc::clone(&values)]).unwrap()); + } + start.elapsed() + }) + }); +} + fn first_last_benchmark(c: &mut Criterion) { const N: usize = 65536; const NUM_GROUPS: usize = 1024; @@ -304,6 +346,18 @@ fn first_last_benchmark(c: &mut Criterion) { NUM_GROUPS, ); } + + for ignore_nulls in [false, true] { + trivial_update_bench( + c, + is_first, + ignore_nulls, + &format!( + "{fn_name} trivial_update_bench nulls={pct}%, ignore_nulls={ignore_nulls}" + ), + values.clone(), + ); + } } } }