diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index 6070470138aa..cb4946342177 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::hint::black_box; -use std::sync::Arc; - -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray, Int64Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; +use datafusion_common::instant::Instant; +use std::hint::black_box; +use std::sync::Arc; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, + AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, }; use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; use datafusion_physical_expr::PhysicalSortExpr; @@ -72,92 +72,113 @@ fn prepare_groups_accumulator(is_first: bool) -> Box { } } -fn prepare_accumulator(is_first: bool) -> Box { - let schema = Arc::new(Schema::new(vec![ - Field::new("value", DataType::Int64, true), - Field::new("ord", DataType::Int64, true), - ])); - - let order_expr = col("ord", &schema).unwrap(); - let sort_expr = PhysicalSortExpr { - expr: order_expr, - options: SortOptions::default(), - }; - - let value_field: Arc = Field::new("value", DataType::Int64, true).into(); - let accumulator_args = AccumulatorArgs { - return_field: Arc::clone(&value_field), - schema: &schema, - expr_fields: &[value_field], - ignore_nulls: false, - order_bys: std::slice::from_ref(&sort_expr), - is_reversed: false, - name: if is_first { - "FIRST_VALUE(value ORDER BY ord)" - } else { - "LAST_VALUE(value ORDER BY ord)" - }, - is_distinct: false, - exprs: &[col("value", &schema).unwrap()], - }; - - if is_first { - FirstValue::new().accumulator(accumulator_args).unwrap() - } else { - LastValue::new().accumulator(accumulator_args).unwrap() - } -} - #[expect(clippy::needless_pass_by_value)] -fn convert_to_state_bench( +#[expect(clippy::too_many_arguments)] +fn evaluate_bench( c: &mut Criterion, is_first: bool, + emit_to: EmitTo, name: &str, values: ArrayRef, + ord: ArrayRef, opt_filter: Option<&BooleanArray>, + num_groups: usize, ) { + let n = values.len(); + let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + c.bench_function(name, |b| { - b.iter(|| { - let accumulator = prepare_groups_accumulator(is_first); - black_box( - accumulator - .convert_to_state(std::slice::from_ref(&values), opt_filter) - .unwrap(), - ) + b.iter_custom(|iters| { + // Every `evaluate` call mutates the accumulator, so prebuild `iters` accumulators + let mut accumulators: Vec> = (0..iters) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(); + accumulator + }) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + black_box(accumulator.evaluate(emit_to).unwrap()); + } + start.elapsed() }) }); } #[expect(clippy::needless_pass_by_value)] -fn evaluate_accumulator_bench( +fn update_bench( c: &mut Criterion, is_first: bool, name: &str, values: ArrayRef, ord: ArrayRef, + opt_filter: Option<&BooleanArray>, + num_groups: usize, ) { + let n = values.len(); + let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + + // Initialize with worst-case ordering so update_batch forces rows comparison for all groups. + let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![ + if is_first { + i64::MAX + } else { + i64::MIN + }; + n + ])); + c.bench_function(name, |b| { - b.iter_batched( - || { - // setup, not timed - let mut accumulator = prepare_accumulator(is_first); - accumulator - .update_batch(&[Arc::clone(&values), Arc::clone(&ord)]) - .unwrap(); - accumulator - }, - |mut accumulator| black_box(accumulator.evaluate().unwrap()), - criterion::BatchSize::SmallInput, - ) + b.iter_custom(|iters| { + // Every `update_batch` call mutates the accumulator, so prebuild `iters` accumulators. + // Each is pre-populated with worst_ord so all is_sets=true before the timed call. + let mut accumulators: Vec> = (0..iters) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&worst_ord)], + &group_indices, + None, // no filter: ensure all groups are initialised + num_groups, + ) + .unwrap(); + accumulator + }) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + #[expect(clippy::unit_arg)] + black_box( + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(), + ); + } + start.elapsed() + }) }); } #[expect(clippy::needless_pass_by_value)] -#[expect(clippy::too_many_arguments)] -fn evaluate_bench( +fn merge_bench( c: &mut Criterion, is_first: bool, - emit_to: EmitTo, name: &str, values: ArrayRef, ord: ArrayRef, @@ -166,25 +187,52 @@ fn evaluate_bench( ) { let n = values.len(); let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + let is_set: ArrayRef = Arc::new(BooleanArray::from(vec![true; n])); + + // Initialize with worst-case ordering so update_batch forces rows comparison for all groups. + let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![ + if is_first { + i64::MAX + } else { + i64::MIN + }; + n + ])); c.bench_function(name, |b| { - b.iter_batched( - || { - // setup, not timed - let mut accumulator = prepare_groups_accumulator(is_first); - accumulator - .update_batch( - &[Arc::clone(&values), Arc::clone(&ord)], - &group_indices, - opt_filter, - num_groups, - ) - .unwrap(); - accumulator - }, - |mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()), - criterion::BatchSize::SmallInput, - ) + b.iter_custom(|iters| { + // Every `merge_batch` call mutates the accumulator, so prebuild `iters` accumulators + let mut accumulators: Vec> = (0..iters) + .map(|_| { + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&worst_ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(); + accumulator + }) + .collect(); + + let start = Instant::now(); + for accumulator in &mut accumulators { + #[expect(clippy::unit_arg)] + black_box( + accumulator + .merge_batch( + &[Arc::clone(&values), Arc::clone(&ord), Arc::clone(&is_set)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(), + ); + } + start.elapsed() + }) }); } @@ -208,45 +256,51 @@ fn first_last_benchmark(c: &mut Criterion) { let ord = Arc::new(create_primitive_array::(N, null_density)) as ArrayRef; - evaluate_accumulator_bench( - c, - is_first, - &format!("{fn_name} evaluate_accumulator_bench nulls={pct}%"), - values.clone(), - ord.clone(), - ); - for with_filter in [false, true] { let filter = create_boolean_array(N, 0.0, 0.5); let opt_filter = if with_filter { Some(&filter) } else { None }; - convert_to_state_bench( + evaluate_bench( c, is_first, + EmitTo::First(2), &format!( - "{fn_name} convert_to_state nulls={pct}%, filter={with_filter}" + "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)" ), values.clone(), + ord.clone(), opt_filter, + NUM_GROUPS, ); evaluate_bench( c, is_first, - EmitTo::First(2), + EmitTo::All, &format!( - "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)" + "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all" ), values.clone(), ord.clone(), opt_filter, NUM_GROUPS, ); - evaluate_bench( + + update_bench( c, is_first, - EmitTo::All, &format!( - "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all" + "{fn_name} update_bench nulls={pct}%, filter={with_filter}, first(2)" + ), + values.clone(), + ord.clone(), + opt_filter, + NUM_GROUPS, + ); + merge_bench( + c, + is_first, + &format!( + "{fn_name} merge_bench nulls={pct}%, filter={with_filter}, first(2)" ), values.clone(), ord.clone(),