Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 151 additions & 97 deletions datafusion/functions-aggregate/benches/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::hint::black_box;
use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray};
use arrow::array::{ArrayRef, BooleanArray, Int64Array};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Int64Type, Schema};
use arrow::util::bench_util::{create_boolean_array, create_primitive_array};
use datafusion_common::instant::Instant;
use std::hint::black_box;
use std::sync::Arc;

use datafusion_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs,
AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs,
};
use datafusion_functions_aggregate::first_last::{FirstValue, LastValue};
use datafusion_physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -72,92 +72,113 @@ fn prepare_groups_accumulator(is_first: bool) -> Box<dyn GroupsAccumulator> {
}
}

fn prepare_accumulator(is_first: bool) -> Box<dyn Accumulator> {
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Int64, true),
Field::new("ord", DataType::Int64, true),
]));

let order_expr = col("ord", &schema).unwrap();
let sort_expr = PhysicalSortExpr {
expr: order_expr,
options: SortOptions::default(),
};

let value_field: Arc<Field> = Field::new("value", DataType::Int64, true).into();
let accumulator_args = AccumulatorArgs {
return_field: Arc::clone(&value_field),
schema: &schema,
expr_fields: &[value_field],
ignore_nulls: false,
order_bys: std::slice::from_ref(&sort_expr),
is_reversed: false,
name: if is_first {
"FIRST_VALUE(value ORDER BY ord)"
} else {
"LAST_VALUE(value ORDER BY ord)"
},
is_distinct: false,
exprs: &[col("value", &schema).unwrap()],
};

if is_first {
FirstValue::new().accumulator(accumulator_args).unwrap()
} else {
LastValue::new().accumulator(accumulator_args).unwrap()
}
}

#[expect(clippy::needless_pass_by_value)]
fn convert_to_state_bench(
#[expect(clippy::too_many_arguments)]
fn evaluate_bench(
c: &mut Criterion,
is_first: bool,
emit_to: EmitTo,
name: &str,
values: ArrayRef,
ord: ArrayRef,
opt_filter: Option<&BooleanArray>,
num_groups: usize,
) {
let n = values.len();
let group_indices: Vec<usize> = (0..n).map(|i| i % num_groups).collect();

c.bench_function(name, |b| {
b.iter(|| {
let accumulator = prepare_groups_accumulator(is_first);
black_box(
accumulator
.convert_to_state(std::slice::from_ref(&values), opt_filter)
.unwrap(),
)
b.iter_custom(|iters| {
// Every `evaluate` call mutates the accumulator, so prebuild `iters` accumulators
let mut accumulators: Vec<Box<dyn GroupsAccumulator>> = (0..iters)
.map(|_| {
let mut accumulator = prepare_groups_accumulator(is_first);
accumulator
.update_batch(
&[Arc::clone(&values), Arc::clone(&ord)],
&group_indices,
opt_filter,
num_groups,
)
.unwrap();
accumulator
})
.collect();

let start = Instant::now();
for accumulator in &mut accumulators {
black_box(accumulator.evaluate(emit_to).unwrap());
}
start.elapsed()
})
});
}

#[expect(clippy::needless_pass_by_value)]
fn evaluate_accumulator_bench(
fn update_bench(
c: &mut Criterion,
is_first: bool,
name: &str,
values: ArrayRef,
ord: ArrayRef,
opt_filter: Option<&BooleanArray>,
num_groups: usize,
) {
let n = values.len();
let group_indices: Vec<usize> = (0..n).map(|i| i % num_groups).collect();

// Initialize with worst-case ordering so update_batch forces rows comparison for all groups.
let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![
if is_first {
i64::MAX
} else {
i64::MIN
};
n
]));

c.bench_function(name, |b| {
b.iter_batched(
|| {
// setup, not timed
let mut accumulator = prepare_accumulator(is_first);
accumulator
.update_batch(&[Arc::clone(&values), Arc::clone(&ord)])
.unwrap();
accumulator
},
|mut accumulator| black_box(accumulator.evaluate().unwrap()),
criterion::BatchSize::SmallInput,
)
b.iter_custom(|iters| {
// Every `update_batch` call mutates the accumulator, so prebuild `iters` accumulators.
// Each is pre-populated with worst_ord so all is_sets=true before the timed call.
let mut accumulators: Vec<Box<dyn GroupsAccumulator>> = (0..iters)
.map(|_| {
let mut accumulator = prepare_groups_accumulator(is_first);
accumulator
.update_batch(
&[Arc::clone(&values), Arc::clone(&worst_ord)],
&group_indices,
None, // no filter: ensure all groups are initialised
num_groups,
)
.unwrap();
accumulator
})
.collect();

let start = Instant::now();
for accumulator in &mut accumulators {
#[expect(clippy::unit_arg)]
black_box(
accumulator
.update_batch(
&[Arc::clone(&values), Arc::clone(&ord)],
&group_indices,
opt_filter,
num_groups,
)
.unwrap(),
);
}
start.elapsed()
})
});
}

#[expect(clippy::needless_pass_by_value)]
#[expect(clippy::too_many_arguments)]
fn evaluate_bench(
fn merge_bench(
c: &mut Criterion,
is_first: bool,
emit_to: EmitTo,
name: &str,
values: ArrayRef,
ord: ArrayRef,
Expand All @@ -166,25 +187,52 @@ fn evaluate_bench(
) {
let n = values.len();
let group_indices: Vec<usize> = (0..n).map(|i| i % num_groups).collect();
let is_set: ArrayRef = Arc::new(BooleanArray::from(vec![true; n]));

// Initialize with worst-case ordering so update_batch forces rows comparison for all groups.
let worst_ord: ArrayRef = Arc::new(Int64Array::from(vec![
if is_first {
i64::MAX
} else {
i64::MIN
};
n
]));

c.bench_function(name, |b| {
b.iter_batched(
|| {
// setup, not timed
let mut accumulator = prepare_groups_accumulator(is_first);
accumulator
.update_batch(
&[Arc::clone(&values), Arc::clone(&ord)],
&group_indices,
opt_filter,
num_groups,
)
.unwrap();
accumulator
},
|mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()),
criterion::BatchSize::SmallInput,
)
b.iter_custom(|iters| {
// Every `merge_batch` call mutates the accumulator, so prebuild `iters` accumulators
let mut accumulators: Vec<Box<dyn GroupsAccumulator>> = (0..iters)
.map(|_| {
let mut accumulator = prepare_groups_accumulator(is_first);
accumulator
.update_batch(
&[Arc::clone(&values), Arc::clone(&worst_ord)],
&group_indices,
opt_filter,
num_groups,
)
.unwrap();
accumulator
})
.collect();

let start = Instant::now();
for accumulator in &mut accumulators {
#[expect(clippy::unit_arg)]
black_box(
accumulator
.merge_batch(
&[Arc::clone(&values), Arc::clone(&ord), Arc::clone(&is_set)],
&group_indices,
opt_filter,
num_groups,
)
.unwrap(),
);
}
start.elapsed()
})
});
}

Expand All @@ -208,45 +256,51 @@ fn first_last_benchmark(c: &mut Criterion) {
let ord = Arc::new(create_primitive_array::<Int64Type>(N, null_density))
as ArrayRef;

evaluate_accumulator_bench(
c,
is_first,
&format!("{fn_name} evaluate_accumulator_bench nulls={pct}%"),
values.clone(),
ord.clone(),
);

for with_filter in [false, true] {
let filter = create_boolean_array(N, 0.0, 0.5);
let opt_filter = if with_filter { Some(&filter) } else { None };

convert_to_state_bench(
evaluate_bench(
c,
is_first,
EmitTo::First(2),
&format!(
"{fn_name} convert_to_state nulls={pct}%, filter={with_filter}"
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)"
),
values.clone(),
ord.clone(),
opt_filter,
NUM_GROUPS,
);
evaluate_bench(
c,
is_first,
EmitTo::First(2),
EmitTo::All,
&format!(
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)"
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all"
),
values.clone(),
ord.clone(),
opt_filter,
NUM_GROUPS,
);
evaluate_bench(

update_bench(
c,
is_first,
EmitTo::All,
&format!(
"{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all"
"{fn_name} update_bench nulls={pct}%, filter={with_filter}, first(2)"
),
values.clone(),
ord.clone(),
opt_filter,
NUM_GROUPS,
);
merge_bench(
c,
is_first,
&format!(
"{fn_name} merge_bench nulls={pct}%, filter={with_filter}, first(2)"
),
values.clone(),
ord.clone(),
Expand Down
Loading