From eb7b3c85d769a40fc4965fd496a91002ec67ae00 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 23:03:39 +0800 Subject: [PATCH 01/13] Refactor StringAggGroupsAccumulator for efficiency Optimize StringAggGroupsAccumulator to retain input and state batches with metadata instead of building a Vec> on every update. Assemble concatenated strings lazily in evaluate() and state(). Adjust size() to reflect retained arrays and metadata. Support EmitTo::First(n) by emitting the required prefix and renumbering retained groups. Include note for future mixed-batch compaction work. --- .../functions-aggregate/src/string_agg.rs | 191 ++++++++++++++---- 1 file changed, 153 insertions(+), 38 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index e62b26da99cc..8dfec52fc663 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::array_agg::ArrayAgg; -use arrow::array::{ArrayRef, AsArray, BooleanArray, LargeStringArray}; +use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, LargeStringArray}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; use datafusion_common::{ @@ -323,46 +323,132 @@ fn filter_index(values: &[T], index: usize) -> Vec { struct StringAggGroupsAccumulator { /// The delimiter placed between concatenated values. delimiter: String, - /// Accumulated string per group. `None` means no values have been seen - /// (the group's output will be NULL). - /// A potential improvement is to avoid this String allocation - /// See - values: Vec>, - /// Running total of string data bytes across all groups. - total_data_bytes: usize, + /// Source arrays retained from input batches or merged state batches. + batches: Vec, + /// Per-batch `(group_idx, row_idx)` pairs for non-null rows. + batch_entries: Vec>, + /// Total number of groups tracked. + num_groups: usize, } impl StringAggGroupsAccumulator { fn new(delimiter: String) -> Self { Self { delimiter, - values: Vec::new(), - total_data_bytes: 0, + batches: Vec::new(), + batch_entries: Vec::new(), + num_groups: 0, } } - fn append_batch<'a>( + fn clear_state(&mut self) { + // `size()` measures Vec capacity rather than len, so allocate new + // buffers instead of using `clear()`. + self.batches = Vec::new(); + self.batch_entries = Vec::new(); + self.num_groups = 0; + } + + fn retain_after_emit(&mut self, emit_groups: usize) { + let emit_groups = emit_groups as u32; + let mut retained_batches = Vec::with_capacity(self.batches.len()); + let mut retained_entries = Vec::with_capacity(self.batch_entries.len()); + + for (batch, mut entries) in + self.batches.drain(..).zip(self.batch_entries.drain(..)) + { + entries.retain(|(group_idx, _)| *group_idx >= emit_groups); + if entries.is_empty() { + continue; + } + + // Keep the original arrays for this prototype and only renumber + // retained groups. SUB_ISSUE_04 will compact mixed batches so + // partially emitted batches no longer pin their full inputs. + for (group_idx, _) in &mut entries { + *group_idx -= emit_groups; + } + + retained_batches.push(batch); + retained_entries.push(entries); + } + + self.batches = retained_batches; + self.batch_entries = retained_entries; + self.num_groups -= emit_groups as usize; + } + + fn append_rows<'a>( &mut self, iter: impl Iterator>, group_indices: &[usize], - ) { - for (opt_value, &group_idx) in iter.zip(group_indices.iter()) { - if let Some(value) = opt_value { - match &mut self.values[group_idx] { + ) -> Vec<(u32, u32)> { + let mut entries = Vec::new(); + + for (row_idx, (opt_value, &group_idx)) in + iter.zip(group_indices.iter()).enumerate() + { + if opt_value.is_some() { + entries.push((group_idx as u32, row_idx as u32)); + } + } + + entries + } + + fn append_batch_values( + values: &mut [Option], + entries: &[(u32, u32)], + array: &ArrayRef, + delimiter: &str, + emit_groups: usize, + ) -> Result<()> { + let append_value = + |values: &mut [Option], group_idx: usize, value: &str| { + match &mut values[group_idx] { Some(existing) => { - let added = self.delimiter.len() + value.len(); - existing.reserve(added); - existing.push_str(&self.delimiter); + existing.push_str(delimiter); existing.push_str(value); - self.total_data_bytes += added; } - slot @ None => { - *slot = Some(value.to_string()); - self.total_data_bytes += value.len(); + slot @ None => *slot = Some(value.to_string()), + } + }; + + match array.data_type() { + DataType::Utf8 => { + let array = array.as_string::(); + for &(group_idx, row_idx) in entries { + let group_idx = group_idx as usize; + if group_idx >= emit_groups || array.is_null(row_idx as usize) { + continue; + } + append_value(values, group_idx, array.value(row_idx as usize)); + } + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + for &(group_idx, row_idx) in entries { + let group_idx = group_idx as usize; + if group_idx >= emit_groups || array.is_null(row_idx as usize) { + continue; + } + append_value(values, group_idx, array.value(row_idx as usize)); + } + } + DataType::Utf8View => { + let array = array.as_string_view(); + for &(group_idx, row_idx) in entries { + let group_idx = group_idx as usize; + if group_idx >= emit_groups || array.is_null(row_idx as usize) { + continue; } + append_value(values, group_idx, array.value(row_idx as usize)); } } + other => return internal_err!("string_agg unexpected data type: {other}"), } + + Ok(()) } } @@ -374,32 +460,52 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - self.values.resize(total_num_groups, None); + self.num_groups = self.num_groups.max(total_num_groups); let array = apply_filter_as_nulls(&values[0], opt_filter)?; - match array.data_type() { + + let entries = match array.data_type() { DataType::Utf8 => { - self.append_batch(array.as_string::().iter(), group_indices) + self.append_rows(array.as_string::().iter(), group_indices) } DataType::LargeUtf8 => { - self.append_batch(array.as_string::().iter(), group_indices) + self.append_rows(array.as_string::().iter(), group_indices) } DataType::Utf8View => { - self.append_batch(array.as_string_view().iter(), group_indices) - } - other => { - return internal_err!("string_agg unexpected data type: {other}"); + self.append_rows(array.as_string_view().iter(), group_indices) } + other => return internal_err!("string_agg unexpected data type: {other}"), + }; + + if !entries.is_empty() { + self.batches.push(array); + self.batch_entries.push(entries); } + Ok(()) } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let to_emit = emit_to.take_needed(&mut self.values); - let emitted_bytes: usize = to_emit - .iter() - .filter_map(|opt| opt.as_ref().map(|s| s.len())) - .sum(); - self.total_data_bytes -= emitted_bytes; + let emit_groups = match emit_to { + EmitTo::All => self.num_groups, + EmitTo::First(n) => n, + }; + + let mut to_emit = vec![None; emit_groups]; + + for (batch, entries) in self.batches.iter().zip(&self.batch_entries) { + Self::append_batch_values( + &mut to_emit, + entries, + batch, + &self.delimiter, + emit_groups, + )?; + } + + match emit_to { + EmitTo::All => self.clear_state(), + EmitTo::First(_) => self.retain_after_emit(emit_groups), + } let result: ArrayRef = Arc::new(LargeStringArray::from(to_emit)); Ok(result) @@ -439,8 +545,17 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { } fn size(&self) -> usize { - self.total_data_bytes - + self.values.capacity() * size_of::>() + self.batches + .iter() + .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) + .sum::() + + self.batches.capacity() * size_of::() + + self + .batch_entries + .iter() + .map(|entries| entries.capacity() * size_of::<(u32, u32)>()) + .sum::() + + self.batch_entries.capacity() * size_of::>() + self.delimiter.capacity() + size_of_val(self) } From 1cc0397da8374df7b486e2089f4d7dfcd2d1b199 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 23:08:07 +0800 Subject: [PATCH 02/13] Refactor row handling and simplify logic Remove unnecessary &mut self from append_rows. Consolidate repeated string-append loop into a typed private helper using ArrayAccessor. Eliminate redundant runtime null checks in favor of non-null entry invariant with debug_assert!. Simplify retain_after_emit into a single filter-and-renumber pass. Trim local ceremony in evaluate() and state() for clarity. --- .../functions-aggregate/src/string_agg.rs | 142 ++++++++++-------- 1 file changed, 78 insertions(+), 64 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 8dfec52fc663..2931ea70d881 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use crate::array_agg::ArrayAgg; -use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, LargeStringArray}; +use arrow::array::{ + Array, ArrayAccessor, ArrayRef, AsArray, BooleanArray, LargeStringArray, +}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; use datafusion_common::{ @@ -354,10 +356,17 @@ impl StringAggGroupsAccumulator { let mut retained_batches = Vec::with_capacity(self.batches.len()); let mut retained_entries = Vec::with_capacity(self.batch_entries.len()); - for (batch, mut entries) in - self.batches.drain(..).zip(self.batch_entries.drain(..)) - { - entries.retain(|(group_idx, _)| *group_idx >= emit_groups); + for (batch, entries) in self.batches.drain(..).zip(self.batch_entries.drain(..)) { + let entries: Vec<_> = entries + .into_iter() + .filter_map(|(group_idx, row_idx)| { + if group_idx >= emit_groups { + Some((group_idx - emit_groups, row_idx)) + } else { + None + } + }) + .collect(); if entries.is_empty() { continue; } @@ -365,10 +374,6 @@ impl StringAggGroupsAccumulator { // Keep the original arrays for this prototype and only renumber // retained groups. SUB_ISSUE_04 will compact mixed batches so // partially emitted batches no longer pin their full inputs. - for (group_idx, _) in &mut entries { - *group_idx -= emit_groups; - } - retained_batches.push(batch); retained_entries.push(entries); } @@ -379,21 +384,51 @@ impl StringAggGroupsAccumulator { } fn append_rows<'a>( - &mut self, iter: impl Iterator>, group_indices: &[usize], ) -> Vec<(u32, u32)> { - let mut entries = Vec::new(); + iter.zip(group_indices.iter()) + .enumerate() + .filter_map(|(row_idx, (opt_value, &group_idx))| { + opt_value.map(|_| (group_idx as u32, row_idx as u32)) + }) + .collect() + } - for (row_idx, (opt_value, &group_idx)) in - iter.zip(group_indices.iter()).enumerate() - { - if opt_value.is_some() { - entries.push((group_idx as u32, row_idx as u32)); + fn append_value( + values: &mut [Option], + group_idx: usize, + value: &str, + delimiter: &str, + ) { + match &mut values[group_idx] { + Some(existing) => { + existing.push_str(delimiter); + existing.push_str(value); } + slot @ None => *slot = Some(value.to_string()), } + } - entries + fn append_batch_values_typed<'a, A>( + values: &mut [Option], + entries: &[(u32, u32)], + array: A, + delimiter: &str, + emit_groups: usize, + ) where + A: ArrayAccessor, + { + for &(group_idx, row_idx) in entries { + let group_idx = group_idx as usize; + if group_idx >= emit_groups { + continue; + } + + let row_idx = row_idx as usize; + debug_assert!(!array.is_null(row_idx)); + Self::append_value(values, group_idx, array.value(row_idx), delimiter); + } } fn append_batch_values( @@ -403,48 +438,28 @@ impl StringAggGroupsAccumulator { delimiter: &str, emit_groups: usize, ) -> Result<()> { - let append_value = - |values: &mut [Option], group_idx: usize, value: &str| { - match &mut values[group_idx] { - Some(existing) => { - existing.push_str(delimiter); - existing.push_str(value); - } - slot @ None => *slot = Some(value.to_string()), - } - }; - match array.data_type() { - DataType::Utf8 => { - let array = array.as_string::(); - for &(group_idx, row_idx) in entries { - let group_idx = group_idx as usize; - if group_idx >= emit_groups || array.is_null(row_idx as usize) { - continue; - } - append_value(values, group_idx, array.value(row_idx as usize)); - } - } - DataType::LargeUtf8 => { - let array = array.as_string::(); - for &(group_idx, row_idx) in entries { - let group_idx = group_idx as usize; - if group_idx >= emit_groups || array.is_null(row_idx as usize) { - continue; - } - append_value(values, group_idx, array.value(row_idx as usize)); - } - } - DataType::Utf8View => { - let array = array.as_string_view(); - for &(group_idx, row_idx) in entries { - let group_idx = group_idx as usize; - if group_idx >= emit_groups || array.is_null(row_idx as usize) { - continue; - } - append_value(values, group_idx, array.value(row_idx as usize)); - } - } + DataType::Utf8 => Self::append_batch_values_typed( + values, + entries, + array.as_string::(), + delimiter, + emit_groups, + ), + DataType::LargeUtf8 => Self::append_batch_values_typed( + values, + entries, + array.as_string::(), + delimiter, + emit_groups, + ), + DataType::Utf8View => Self::append_batch_values_typed( + values, + entries, + array.as_string_view(), + delimiter, + emit_groups, + ), other => return internal_err!("string_agg unexpected data type: {other}"), } @@ -465,13 +480,13 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { let entries = match array.data_type() { DataType::Utf8 => { - self.append_rows(array.as_string::().iter(), group_indices) + Self::append_rows(array.as_string::().iter(), group_indices) } DataType::LargeUtf8 => { - self.append_rows(array.as_string::().iter(), group_indices) + Self::append_rows(array.as_string::().iter(), group_indices) } DataType::Utf8View => { - self.append_rows(array.as_string_view().iter(), group_indices) + Self::append_rows(array.as_string_view().iter(), group_indices) } other => return internal_err!("string_agg unexpected data type: {other}"), }; @@ -507,12 +522,11 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { EmitTo::First(_) => self.retain_after_emit(emit_groups), } - let result: ArrayRef = Arc::new(LargeStringArray::from(to_emit)); - Ok(result) + Ok(Arc::new(LargeStringArray::from(to_emit))) } fn state(&mut self, emit_to: EmitTo) -> Result> { - self.evaluate(emit_to).map(|arr| vec![arr]) + Ok(vec![self.evaluate(emit_to)?]) } fn merge_batch( From 3e481f837ff212262a070eacbe783e3e85968042 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 23:15:40 +0800 Subject: [PATCH 03/13] Refactor string_agg handling and rename slot appender Consolidate string-like array routing through a single StringInputArray abstraction to improve maintainability. Rename the slot appender to append_group_value for better readability of the lazy-assembly path. --- .../functions-aggregate/src/string_agg.rs | 131 ++++++++++++------ 1 file changed, 85 insertions(+), 46 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 2931ea70d881..2e72b9bbf0bf 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -24,7 +24,8 @@ use std::sync::Arc; use crate::array_agg::ArrayAgg; use arrow::array::{ - Array, ArrayAccessor, ArrayRef, AsArray, BooleanArray, LargeStringArray, + Array, ArrayAccessor, ArrayRef, AsArray, BooleanArray, GenericStringArray, + LargeStringArray, StringArrayType, StringViewArray, }; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; @@ -333,6 +334,73 @@ struct StringAggGroupsAccumulator { num_groups: usize, } +enum StringInputArray<'a> { + Utf8(&'a GenericStringArray), + LargeUtf8(&'a GenericStringArray), + Utf8View(&'a StringViewArray), +} + +impl<'a> StringInputArray<'a> { + fn try_new(array: &'a ArrayRef) -> Result { + match array.data_type() { + DataType::Utf8 => Ok(Self::Utf8(array.as_string::())), + DataType::LargeUtf8 => Ok(Self::LargeUtf8(array.as_string::())), + DataType::Utf8View => Ok(Self::Utf8View(array.as_string_view())), + other => internal_err!("string_agg unexpected data type: {other}"), + } + } + + fn append_rows(&self, group_indices: &[usize]) -> Vec<(u32, u32)> { + match self { + Self::Utf8(array) => { + StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + } + Self::LargeUtf8(array) => { + StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + } + Self::Utf8View(array) => { + StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + } + } + } + + fn append_batch_values( + &self, + values: &mut [Option], + entries: &[(u32, u32)], + delimiter: &str, + emit_groups: usize, + ) { + match self { + Self::Utf8(array) => StringAggGroupsAccumulator::append_batch_values_typed( + values, + entries, + *array, + delimiter, + emit_groups, + ), + Self::LargeUtf8(array) => { + StringAggGroupsAccumulator::append_batch_values_typed( + values, + entries, + *array, + delimiter, + emit_groups, + ) + } + Self::Utf8View(array) => { + StringAggGroupsAccumulator::append_batch_values_typed( + values, + entries, + *array, + delimiter, + emit_groups, + ) + } + } + } +} + impl StringAggGroupsAccumulator { fn new(delimiter: String) -> Self { Self { @@ -383,11 +451,13 @@ impl StringAggGroupsAccumulator { self.num_groups -= emit_groups as usize; } - fn append_rows<'a>( - iter: impl Iterator>, - group_indices: &[usize], - ) -> Vec<(u32, u32)> { - iter.zip(group_indices.iter()) + fn append_rows_typed<'a, A>(array: A, group_indices: &[usize]) -> Vec<(u32, u32)> + where + A: StringArrayType<'a>, + { + array + .iter() + .zip(group_indices.iter()) .enumerate() .filter_map(|(row_idx, (opt_value, &group_idx))| { opt_value.map(|_| (group_idx as u32, row_idx as u32)) @@ -395,7 +465,7 @@ impl StringAggGroupsAccumulator { .collect() } - fn append_value( + fn append_group_value( values: &mut [Option], group_idx: usize, value: &str, @@ -427,7 +497,7 @@ impl StringAggGroupsAccumulator { let row_idx = row_idx as usize; debug_assert!(!array.is_null(row_idx)); - Self::append_value(values, group_idx, array.value(row_idx), delimiter); + Self::append_group_value(values, group_idx, array.value(row_idx), delimiter); } } @@ -438,31 +508,12 @@ impl StringAggGroupsAccumulator { delimiter: &str, emit_groups: usize, ) -> Result<()> { - match array.data_type() { - DataType::Utf8 => Self::append_batch_values_typed( - values, - entries, - array.as_string::(), - delimiter, - emit_groups, - ), - DataType::LargeUtf8 => Self::append_batch_values_typed( - values, - entries, - array.as_string::(), - delimiter, - emit_groups, - ), - DataType::Utf8View => Self::append_batch_values_typed( - values, - entries, - array.as_string_view(), - delimiter, - emit_groups, - ), - other => return internal_err!("string_agg unexpected data type: {other}"), - } - + StringInputArray::try_new(array)?.append_batch_values( + values, + entries, + delimiter, + emit_groups, + ); Ok(()) } } @@ -477,19 +528,7 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { ) -> Result<()> { self.num_groups = self.num_groups.max(total_num_groups); let array = apply_filter_as_nulls(&values[0], opt_filter)?; - - let entries = match array.data_type() { - DataType::Utf8 => { - Self::append_rows(array.as_string::().iter(), group_indices) - } - DataType::LargeUtf8 => { - Self::append_rows(array.as_string::().iter(), group_indices) - } - DataType::Utf8View => { - Self::append_rows(array.as_string_view().iter(), group_indices) - } - other => return internal_err!("string_agg unexpected data type: {other}"), - }; + let entries = StringInputArray::try_new(&array)?.append_rows(group_indices); if !entries.is_empty() { self.batches.push(array); From 75faa420275a5e47fdae40edca6e6c9424373683 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 12:04:27 +0800 Subject: [PATCH 04/13] Add benchmark --- .../core/benches/aggregate_query_sql.rs | 95 ++++++++++++------- datafusion/core/benches/data_utils/mod.rs | 78 ++++++++++++++- 2 files changed, 135 insertions(+), 38 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index d7e24aceba17..8eb23218f013 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -17,8 +17,8 @@ mod data_utils; -use criterion::{Criterion, criterion_group, criterion_main}; -use data_utils::create_table_provider; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload}; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -36,13 +36,38 @@ fn create_context( partitions_len: usize, array_len: usize, batch_size: usize, +) -> Result>> { + create_context_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +fn create_context_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = create_table_provider(partitions_len, array_len, batch_size)?; + let provider = create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + utf8_payload_profile, + )?; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } +fn string_agg_sql(group_by_column: &str) -> String { + format!( + "SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}" + ) +} + fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -296,38 +321,42 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("string_agg_query_group_by_few_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_narrow, string_agg(utf8, ',') \ - FROM t GROUP BY u64_narrow", - ) - }) - }); + // These payload sizes keep the original 4-value cardinality while changing + // only the bytes copied into grouped `string_agg` state: + // - small_3b preserves the existing `hi0`..`hi3` baseline + // - medium_64b makes copy costs measurable without overwhelming the query + // - large_1024b stresses both CPU and memory behavior + let string_agg_profiles = [ + (Utf8PayloadProfile::Small, "small_3b"), + (Utf8PayloadProfile::Medium, "medium_64b"), + (Utf8PayloadProfile::Large, "large_1024b"), + ] + .into_iter() + .map(|(profile, label)| { + ( + label, + create_context_with_payload(partitions_len, array_len, batch_size, profile) + .unwrap(), + ) + }) + .collect::>(); - c.bench_function("string_agg_query_group_by_mid_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_mid, string_agg(utf8, ',') \ - FROM t GROUP BY u64_mid", - ) - }) - }); + let string_agg_queries = [ + ("few_groups", string_agg_sql("u64_narrow")), + ("mid_groups", string_agg_sql("u64_mid")), + ("many_groups", string_agg_sql("u64_wide")), + ]; - c.bench_function("string_agg_query_group_by_many_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_wide, string_agg(utf8, ',') \ - FROM t GROUP BY u64_wide", - ) - }) - }); + let mut string_agg_group = c.benchmark_group("string_agg_payloads"); + for (query_name, sql) in &string_agg_queries { + for (payload_name, payload_ctx) in &string_agg_profiles { + string_agg_group + .bench_function(BenchmarkId::new(*query_name, *payload_name), |b| { + b.iter(|| query(payload_ctx.clone(), &rt, sql)) + }); + } + } + string_agg_group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 728c6490c72b..250f393408b9 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -35,6 +35,23 @@ use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; +/// Payload profile for the benchmark `utf8` column. +/// +/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and +/// large profiles keep the same low cardinality but scale each value's byte +/// width so string aggregation can expose the cost of copying larger payloads. +#[derive(Clone, Copy, Debug)] +pub enum Utf8PayloadProfile { + /// 3-byte baseline values such as `hi0`. + Small, + /// 64-byte payloads that are large enough to make copying noticeable + /// without dominating the benchmark with allocator churn. + Medium, + /// 1024-byte payloads that amplify both CPU and memory pressure in + /// grouped `string_agg` workloads. + Large, +} + /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly @@ -44,9 +61,32 @@ pub fn create_table_provider( array_len: usize, batch_size: usize, ) -> Result> { + create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +/// Create an in-memory table with a configurable `utf8` payload size. +#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly +#[allow(dead_code)] +pub fn create_table_provider_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, +) -> Result> { + let _ = Utf8PayloadProfile::all(); let schema = Arc::new(create_schema()); - let partitions = - create_record_batches(&schema, array_len, partitions_len, batch_size); + let partitions = create_record_batches( + &schema, + array_len, + partitions_len, + batch_size, + utf8_payload_profile, + ); // declare a table in memory. In spark API, this corresponds to createDataFrame(...). MemTable::try_new(schema, partitions).map(Arc::new) } @@ -91,12 +131,14 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, + payloads: &[String; 4], ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let keys = StringArray::from_iter_values( - (0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())), - ); + let keys = StringArray::from_iter_values((0..batch_size).map(|_| { + let suffix = *key_suffixes.choose(rng).unwrap(); + payloads[suffix].as_str() + })); let values = create_data(rng, batch_size, 0.5); @@ -146,10 +188,12 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); let batches_per_partition = array_len / batch_size / partitions_len; + let payloads = utf8_payload_profile.payloads(); for _ in 0..partitions_len { let mut batches = Vec::with_capacity(batches_per_partition); @@ -159,6 +203,7 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, + &payloads, )); } partitions.push(batches); @@ -166,6 +211,29 @@ pub fn create_record_batches( partitions } +impl Utf8PayloadProfile { + fn all() -> [Self; 3] { + [Self::Small, Self::Medium, Self::Large] + } + + fn payloads(self) -> [String; 4] { + std::array::from_fn(|idx| match self { + Self::Small => format!("hi{idx}"), + Self::Medium => payload_string("mid", idx, 64), + Self::Large => payload_string("large", idx, 1024), + }) + } +} + +fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { + let mut value = format!("{prefix}{suffix}_"); + value.extend(std::iter::repeat_n( + (b'a' + suffix as u8) as char, + target_len - value.len(), + )); + value +} + /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From 8be4df1f2c015a9622ba75bc3ee94bec7f3a4d54 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 14:26:40 +0800 Subject: [PATCH 05/13] clippy fix - Refactor array parameter types in string_agg.rs Update append_rows_typed and append_batch_values_typed to accept array references instead of values. Modify call sites in StringInputArray to pass references, improving memory efficiency and consistency across function calls. --- datafusion/functions-aggregate/src/string_agg.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 2e72b9bbf0bf..67e7a62bf4fb 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -353,13 +353,13 @@ impl<'a> StringInputArray<'a> { fn append_rows(&self, group_indices: &[usize]) -> Vec<(u32, u32)> { match self { Self::Utf8(array) => { - StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + StringAggGroupsAccumulator::append_rows_typed(array, group_indices) } Self::LargeUtf8(array) => { - StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + StringAggGroupsAccumulator::append_rows_typed(array, group_indices) } Self::Utf8View(array) => { - StringAggGroupsAccumulator::append_rows_typed(*array, group_indices) + StringAggGroupsAccumulator::append_rows_typed(array, group_indices) } } } @@ -375,7 +375,7 @@ impl<'a> StringInputArray<'a> { Self::Utf8(array) => StringAggGroupsAccumulator::append_batch_values_typed( values, entries, - *array, + array, delimiter, emit_groups, ), @@ -383,7 +383,7 @@ impl<'a> StringInputArray<'a> { StringAggGroupsAccumulator::append_batch_values_typed( values, entries, - *array, + array, delimiter, emit_groups, ) @@ -392,7 +392,7 @@ impl<'a> StringInputArray<'a> { StringAggGroupsAccumulator::append_batch_values_typed( values, entries, - *array, + array, delimiter, emit_groups, ) @@ -451,7 +451,7 @@ impl StringAggGroupsAccumulator { self.num_groups -= emit_groups as usize; } - fn append_rows_typed<'a, A>(array: A, group_indices: &[usize]) -> Vec<(u32, u32)> + fn append_rows_typed<'a, A>(array: &A, group_indices: &[usize]) -> Vec<(u32, u32)> where A: StringArrayType<'a>, { @@ -483,7 +483,7 @@ impl StringAggGroupsAccumulator { fn append_batch_values_typed<'a, A>( values: &mut [Option], entries: &[(u32, u32)], - array: A, + array: &A, delimiter: &str, emit_groups: usize, ) where From fc5345f1946458464c95dadb4722a20a75167cb4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 19:28:27 +0800 Subject: [PATCH 06/13] Optimize string_agg for hybrid eager and deferred modes Adjust string_agg to implement a hybrid accumulator, offering eager updates for lightweight workloads and switching to deferred row tracking for larger batches. This change enhances performance while maintaining efficiency. Included mixed-mode regression tests to cover various batch scenarios and ensure correctness. --- .../functions-aggregate/src/string_agg.rs | 174 ++++++++++++++++-- 1 file changed, 154 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 67e7a62bf4fb..e1d8e10d91cc 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -18,7 +18,7 @@ //! [`StringAgg`] accumulator for the `string_agg` function use std::hash::Hash; -use std::mem::size_of_val; +use std::mem::{size_of, size_of_val}; use std::sync::Arc; use crate::array_agg::ArrayAgg; @@ -326,6 +326,10 @@ fn filter_index(values: &[T], index: usize) -> Vec { struct StringAggGroupsAccumulator { /// The delimiter placed between concatenated values. delimiter: String, + /// Materialized string state for groups that use the eager fast path. + values: Vec>, + /// Running total of bytes stored in `values`. + total_data_bytes: usize, /// Source arrays retained from input batches or merged state batches. batches: Vec, /// Per-batch `(group_idx, row_idx)` pairs for non-null rows. @@ -341,6 +345,14 @@ enum StringInputArray<'a> { } impl<'a> StringInputArray<'a> { + fn sample_non_null_len(&self) -> Option { + match self { + Self::Utf8(array) => array.iter().flatten().next().map(str::len), + Self::LargeUtf8(array) => array.iter().flatten().next().map(str::len), + Self::Utf8View(array) => array.iter().flatten().next().map(str::len), + } + } + fn try_new(array: &'a ArrayRef) -> Result { match array.data_type() { DataType::Utf8 => Ok(Self::Utf8(array.as_string::())), @@ -364,6 +376,34 @@ impl<'a> StringInputArray<'a> { } } + fn append_materialized( + &self, + values: &mut [Option], + group_indices: &[usize], + delimiter: &str, + ) -> usize { + match self { + Self::Utf8(array) => StringAggGroupsAccumulator::append_batch_typed( + values, + array.iter(), + group_indices, + delimiter, + ), + Self::LargeUtf8(array) => StringAggGroupsAccumulator::append_batch_typed( + values, + array.iter(), + group_indices, + delimiter, + ), + Self::Utf8View(array) => StringAggGroupsAccumulator::append_batch_typed( + values, + array.iter(), + group_indices, + delimiter, + ), + } + } + fn append_batch_values( &self, values: &mut [Option], @@ -402,9 +442,14 @@ impl<'a> StringInputArray<'a> { } impl StringAggGroupsAccumulator { + const DEFER_GROUP_THRESHOLD: usize = 32; + const DEFER_PAYLOAD_LEN_THRESHOLD: usize = 32; + fn new(delimiter: String) -> Self { Self { delimiter, + values: Vec::new(), + total_data_bytes: 0, batches: Vec::new(), batch_entries: Vec::new(), num_groups: 0, @@ -414,6 +459,8 @@ impl StringAggGroupsAccumulator { fn clear_state(&mut self) { // `size()` measures Vec capacity rather than len, so allocate new // buffers instead of using `clear()`. + self.values = Vec::new(); + self.total_data_bytes = 0; self.batches = Vec::new(); self.batch_entries = Vec::new(); self.num_groups = 0; @@ -448,7 +495,6 @@ impl StringAggGroupsAccumulator { self.batches = retained_batches; self.batch_entries = retained_entries; - self.num_groups -= emit_groups as usize; } fn append_rows_typed<'a, A>(array: &A, group_indices: &[usize]) -> Vec<(u32, u32)> @@ -470,16 +516,40 @@ impl StringAggGroupsAccumulator { group_idx: usize, value: &str, delimiter: &str, - ) { + ) -> usize { match &mut values[group_idx] { Some(existing) => { + let added = delimiter.len() + value.len(); + existing.reserve(added); existing.push_str(delimiter); existing.push_str(value); + added + } + slot @ None => { + *slot = Some(value.to_string()); + value.len() } - slot @ None => *slot = Some(value.to_string()), } } + fn append_batch_typed<'a, I>( + values: &mut [Option], + iter: I, + group_indices: &[usize], + delimiter: &str, + ) -> usize + where + I: Iterator>, + { + iter.zip(group_indices.iter()) + .filter_map(|(opt_value, &group_idx)| { + opt_value.map(|value| { + Self::append_group_value(values, group_idx, value, delimiter) + }) + }) + .sum() + } + fn append_batch_values_typed<'a, A>( values: &mut [Option], entries: &[(u32, u32)], @@ -497,7 +567,12 @@ impl StringAggGroupsAccumulator { let row_idx = row_idx as usize; debug_assert!(!array.is_null(row_idx)); - Self::append_group_value(values, group_idx, array.value(row_idx), delimiter); + let _ = Self::append_group_value( + values, + group_idx, + array.value(row_idx), + delimiter, + ); } } @@ -516,6 +591,17 @@ impl StringAggGroupsAccumulator { ); Ok(()) } + + fn should_defer( + &self, + input: &StringInputArray<'_>, + total_num_groups: usize, + ) -> bool { + total_num_groups >= Self::DEFER_GROUP_THRESHOLD + && input + .sample_non_null_len() + .is_some_and(|len| len >= Self::DEFER_PAYLOAD_LEN_THRESHOLD) + } } impl GroupsAccumulator for StringAggGroupsAccumulator { @@ -527,24 +613,35 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { total_num_groups: usize, ) -> Result<()> { self.num_groups = self.num_groups.max(total_num_groups); + self.values.resize(total_num_groups, None); let array = apply_filter_as_nulls(&values[0], opt_filter)?; - let entries = StringInputArray::try_new(&array)?.append_rows(group_indices); + let input = StringInputArray::try_new(&array)?; - if !entries.is_empty() { - self.batches.push(array); - self.batch_entries.push(entries); + if self.should_defer(&input, total_num_groups) { + let entries = input.append_rows(group_indices); + if !entries.is_empty() { + self.batches.push(array); + self.batch_entries.push(entries); + } + } else { + self.total_data_bytes += input.append_materialized( + &mut self.values, + group_indices, + &self.delimiter, + ); } Ok(()) } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let emit_groups = match emit_to { - EmitTo::All => self.num_groups, - EmitTo::First(n) => n, - }; - - let mut to_emit = vec![None; emit_groups]; + let mut to_emit = emit_to.take_needed(&mut self.values); + let emit_groups = to_emit.len(); + let emitted_bytes: usize = to_emit + .iter() + .filter_map(|opt| opt.as_ref().map(|s| s.len())) + .sum(); + self.total_data_bytes -= emitted_bytes; for (batch, entries) in self.batches.iter().zip(&self.batch_entries) { Self::append_batch_values( @@ -558,7 +655,10 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { match emit_to { EmitTo::All => self.clear_state(), - EmitTo::First(_) => self.retain_after_emit(emit_groups), + EmitTo::First(_) => { + self.retain_after_emit(emit_groups); + self.num_groups = self.values.len(); + } } Ok(Arc::new(LargeStringArray::from(to_emit))) @@ -598,10 +698,13 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { } fn size(&self) -> usize { - self.batches - .iter() - .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) - .sum::() + self.total_data_bytes + + self.values.capacity() * size_of::>() + + self + .batches + .iter() + .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) + .sum::() + self.batches.capacity() * size_of::() + self .batch_entries @@ -1120,4 +1223,35 @@ mod tests { ); Ok(()) } + + #[test] + fn groups_mixed_eager_and_deferred_batches() -> Result<()> { + let mut acc = make_groups_acc(","); + + let eager_values: ArrayRef = + Arc::new(LargeStringArray::from(vec!["a", "b", "c", "d"])); + acc.update_batch(&[eager_values], &[0, 1, 0, 1], None, 40)?; + + let deferred_values: ArrayRef = Arc::new(LargeStringArray::from(vec![ + "large0_abcdefghijklmnopqrstuvwxyzabcdef", + "large1_bcdefghijklmnopqrstuvwxyzabcdefg", + "large2_cdefghijklmnopqrstuvwxyzabcdefgh", + ])); + acc.update_batch(&[deferred_values], &[0, 1, 39], None, 40)?; + + let result = evaluate_groups(&mut acc, EmitTo::First(2)); + assert_eq!( + result, + vec![ + Some("a,c,large0_abcdefghijklmnopqrstuvwxyzabcdef".to_string()), + Some("b,d,large1_bcdefghijklmnopqrstuvwxyzabcdefg".to_string()), + ] + ); + + let remaining = evaluate_groups(&mut acc, EmitTo::All); + let mut expected = vec![None; 38]; + expected[37] = Some("large2_cdefghijklmnopqrstuvwxyzabcdefgh".to_string()); + assert_eq!(remaining, expected); + Ok(()) + } } From e59316ff172bb84792fb176abdbbb9410d3942a6 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 22:09:11 +0800 Subject: [PATCH 07/13] Refactor dispatch logic using new macro Eliminate repeated match arms in string_agg.rs by introducing a local dispatch macro. This enhances clarity and readability, allowing each method to focus on intent while simplifying maintenance for future changes. The refactor preserves existing static dispatch behavior, ensuring that all targeted tests continue to pass. --- .../functions-aggregate/src/string_agg.rs | 78 ++++++------------- 1 file changed, 24 insertions(+), 54 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index e1d8e10d91cc..a2c9879776de 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -344,13 +344,21 @@ enum StringInputArray<'a> { Utf8View(&'a StringViewArray), } +macro_rules! dispatch_string_input_array { + ($self:expr, $array:ident => $expr:expr) => { + match $self { + Self::Utf8($array) => $expr, + Self::LargeUtf8($array) => $expr, + Self::Utf8View($array) => $expr, + } + }; +} + impl<'a> StringInputArray<'a> { fn sample_non_null_len(&self) -> Option { - match self { - Self::Utf8(array) => array.iter().flatten().next().map(str::len), - Self::LargeUtf8(array) => array.iter().flatten().next().map(str::len), - Self::Utf8View(array) => array.iter().flatten().next().map(str::len), - } + dispatch_string_input_array!(self, array => { + array.iter().flatten().next().map(str::len) + }) } fn try_new(array: &'a ArrayRef) -> Result { @@ -363,17 +371,9 @@ impl<'a> StringInputArray<'a> { } fn append_rows(&self, group_indices: &[usize]) -> Vec<(u32, u32)> { - match self { - Self::Utf8(array) => { - StringAggGroupsAccumulator::append_rows_typed(array, group_indices) - } - Self::LargeUtf8(array) => { - StringAggGroupsAccumulator::append_rows_typed(array, group_indices) - } - Self::Utf8View(array) => { - StringAggGroupsAccumulator::append_rows_typed(array, group_indices) - } - } + dispatch_string_input_array!(self, array => { + StringAggGroupsAccumulator::append_rows_typed(array, group_indices) + }) } fn append_materialized( @@ -382,26 +382,14 @@ impl<'a> StringInputArray<'a> { group_indices: &[usize], delimiter: &str, ) -> usize { - match self { - Self::Utf8(array) => StringAggGroupsAccumulator::append_batch_typed( - values, - array.iter(), - group_indices, - delimiter, - ), - Self::LargeUtf8(array) => StringAggGroupsAccumulator::append_batch_typed( - values, - array.iter(), - group_indices, - delimiter, - ), - Self::Utf8View(array) => StringAggGroupsAccumulator::append_batch_typed( + dispatch_string_input_array!(self, array => { + StringAggGroupsAccumulator::append_batch_typed( values, array.iter(), group_indices, delimiter, - ), - } + ) + }) } fn append_batch_values( @@ -411,33 +399,15 @@ impl<'a> StringInputArray<'a> { delimiter: &str, emit_groups: usize, ) { - match self { - Self::Utf8(array) => StringAggGroupsAccumulator::append_batch_values_typed( + dispatch_string_input_array!(self, array => { + StringAggGroupsAccumulator::append_batch_values_typed( values, entries, array, delimiter, emit_groups, - ), - Self::LargeUtf8(array) => { - StringAggGroupsAccumulator::append_batch_values_typed( - values, - entries, - array, - delimiter, - emit_groups, - ) - } - Self::Utf8View(array) => { - StringAggGroupsAccumulator::append_batch_values_typed( - values, - entries, - array, - delimiter, - emit_groups, - ) - } - } + ) + }) } } From a9529c944f5e98044b57e5b8ecee438e2cca0083 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 21:51:46 +0800 Subject: [PATCH 08/13] revert to main benchmark --- .../core/benches/aggregate_query_sql.rs | 95 +++++++------------ datafusion/core/benches/data_utils/mod.rs | 78 +-------------- 2 files changed, 38 insertions(+), 135 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 8eb23218f013..d7e24aceba17 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -17,8 +17,8 @@ mod data_utils; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; -use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload}; +use criterion::{Criterion, criterion_group, criterion_main}; +use data_utils::create_table_provider; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -36,38 +36,13 @@ fn create_context( partitions_len: usize, array_len: usize, batch_size: usize, -) -> Result>> { - create_context_with_payload( - partitions_len, - array_len, - batch_size, - Utf8PayloadProfile::Small, - ) -} - -fn create_context_with_payload( - partitions_len: usize, - array_len: usize, - batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = create_table_provider_with_payload( - partitions_len, - array_len, - batch_size, - utf8_payload_profile, - )?; + let provider = create_table_provider(partitions_len, array_len, batch_size)?; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } -fn string_agg_sql(group_by_column: &str) -> String { - format!( - "SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}" - ) -} - fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -321,42 +296,38 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - // These payload sizes keep the original 4-value cardinality while changing - // only the bytes copied into grouped `string_agg` state: - // - small_3b preserves the existing `hi0`..`hi3` baseline - // - medium_64b makes copy costs measurable without overwhelming the query - // - large_1024b stresses both CPU and memory behavior - let string_agg_profiles = [ - (Utf8PayloadProfile::Small, "small_3b"), - (Utf8PayloadProfile::Medium, "medium_64b"), - (Utf8PayloadProfile::Large, "large_1024b"), - ] - .into_iter() - .map(|(profile, label)| { - ( - label, - create_context_with_payload(partitions_len, array_len, batch_size, profile) - .unwrap(), - ) - }) - .collect::>(); + c.bench_function("string_agg_query_group_by_few_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_narrow, string_agg(utf8, ',') \ + FROM t GROUP BY u64_narrow", + ) + }) + }); - let string_agg_queries = [ - ("few_groups", string_agg_sql("u64_narrow")), - ("mid_groups", string_agg_sql("u64_mid")), - ("many_groups", string_agg_sql("u64_wide")), - ]; + c.bench_function("string_agg_query_group_by_mid_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_mid, string_agg(utf8, ',') \ + FROM t GROUP BY u64_mid", + ) + }) + }); - let mut string_agg_group = c.benchmark_group("string_agg_payloads"); - for (query_name, sql) in &string_agg_queries { - for (payload_name, payload_ctx) in &string_agg_profiles { - string_agg_group - .bench_function(BenchmarkId::new(*query_name, *payload_name), |b| { - b.iter(|| query(payload_ctx.clone(), &rt, sql)) - }); - } - } - string_agg_group.finish(); + c.bench_function("string_agg_query_group_by_many_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_wide, string_agg(utf8, ',') \ + FROM t GROUP BY u64_wide", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 250f393408b9..728c6490c72b 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -35,23 +35,6 @@ use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; -/// Payload profile for the benchmark `utf8` column. -/// -/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and -/// large profiles keep the same low cardinality but scale each value's byte -/// width so string aggregation can expose the cost of copying larger payloads. -#[derive(Clone, Copy, Debug)] -pub enum Utf8PayloadProfile { - /// 3-byte baseline values such as `hi0`. - Small, - /// 64-byte payloads that are large enough to make copying noticeable - /// without dominating the benchmark with allocator churn. - Medium, - /// 1024-byte payloads that amplify both CPU and memory pressure in - /// grouped `string_agg` workloads. - Large, -} - /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly @@ -61,32 +44,9 @@ pub fn create_table_provider( array_len: usize, batch_size: usize, ) -> Result> { - create_table_provider_with_payload( - partitions_len, - array_len, - batch_size, - Utf8PayloadProfile::Small, - ) -} - -/// Create an in-memory table with a configurable `utf8` payload size. -#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly -#[allow(dead_code)] -pub fn create_table_provider_with_payload( - partitions_len: usize, - array_len: usize, - batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, -) -> Result> { - let _ = Utf8PayloadProfile::all(); let schema = Arc::new(create_schema()); - let partitions = create_record_batches( - &schema, - array_len, - partitions_len, - batch_size, - utf8_payload_profile, - ); + let partitions = + create_record_batches(&schema, array_len, partitions_len, batch_size); // declare a table in memory. In spark API, this corresponds to createDataFrame(...). MemTable::try_new(schema, partitions).map(Arc::new) } @@ -131,14 +91,12 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, - payloads: &[String; 4], ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let keys = StringArray::from_iter_values((0..batch_size).map(|_| { - let suffix = *key_suffixes.choose(rng).unwrap(); - payloads[suffix].as_str() - })); + let keys = StringArray::from_iter_values( + (0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())), + ); let values = create_data(rng, batch_size, 0.5); @@ -188,12 +146,10 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); let batches_per_partition = array_len / batch_size / partitions_len; - let payloads = utf8_payload_profile.payloads(); for _ in 0..partitions_len { let mut batches = Vec::with_capacity(batches_per_partition); @@ -203,7 +159,6 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, - &payloads, )); } partitions.push(batches); @@ -211,29 +166,6 @@ pub fn create_record_batches( partitions } -impl Utf8PayloadProfile { - fn all() -> [Self; 3] { - [Self::Small, Self::Medium, Self::Large] - } - - fn payloads(self) -> [String; 4] { - std::array::from_fn(|idx| match self { - Self::Small => format!("hi{idx}"), - Self::Medium => payload_string("mid", idx, 64), - Self::Large => payload_string("large", idx, 1024), - }) - } -} - -fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { - let mut value = format!("{prefix}{suffix}_"); - value.extend(std::iter::repeat_n( - (b'a' + suffix as u8) as char, - target_len - value.len(), - )); - value -} - /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From 1d84b456533769aeb1318b106bcff73a19177fcf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Apr 2026 10:57:26 +0800 Subject: [PATCH 09/13] Improve string_agg performance with eager mode adjustments Rework the string_agg accumulator to initiate in eager mode, reducing unnecessary allocations. Restore an eager append helper for the hot path and enhance promotion logic to use lightweight size estimates from Arrow buffers. This allows short payloads to remain on the eager path while enabling deferred copying for larger batches. Add regression tests to ensure short payloads do not promote and mixed eager/deferred batches operate correctly. --- .../functions-aggregate/src/string_agg.rs | 228 ++++++++++++------ 1 file changed, 150 insertions(+), 78 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index a2c9879776de..dc29a8898b05 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -330,12 +330,16 @@ struct StringAggGroupsAccumulator { values: Vec>, /// Running total of bytes stored in `values`. total_data_bytes: usize, + /// Deferred bookkeeping is allocated lazily after promotion. + deferred: Option, +} + +#[derive(Debug, Default)] +struct DeferredRows { /// Source arrays retained from input batches or merged state batches. batches: Vec, /// Per-batch `(group_idx, row_idx)` pairs for non-null rows. batch_entries: Vec>, - /// Total number of groups tracked. - num_groups: usize, } enum StringInputArray<'a> { @@ -355,12 +359,6 @@ macro_rules! dispatch_string_input_array { } impl<'a> StringInputArray<'a> { - fn sample_non_null_len(&self) -> Option { - dispatch_string_input_array!(self, array => { - array.iter().flatten().next().map(str::len) - }) - } - fn try_new(array: &'a ArrayRef) -> Result { match array.data_type() { DataType::Utf8 => Ok(Self::Utf8(array.as_string::())), @@ -376,22 +374,6 @@ impl<'a> StringInputArray<'a> { }) } - fn append_materialized( - &self, - values: &mut [Option], - group_indices: &[usize], - delimiter: &str, - ) -> usize { - dispatch_string_input_array!(self, array => { - StringAggGroupsAccumulator::append_batch_typed( - values, - array.iter(), - group_indices, - delimiter, - ) - }) - } - fn append_batch_values( &self, values: &mut [Option], @@ -420,9 +402,7 @@ impl StringAggGroupsAccumulator { delimiter, values: Vec::new(), total_data_bytes: 0, - batches: Vec::new(), - batch_entries: Vec::new(), - num_groups: 0, + deferred: None, } } @@ -431,17 +411,19 @@ impl StringAggGroupsAccumulator { // buffers instead of using `clear()`. self.values = Vec::new(); self.total_data_bytes = 0; - self.batches = Vec::new(); - self.batch_entries = Vec::new(); - self.num_groups = 0; + self.deferred = None; } - fn retain_after_emit(&mut self, emit_groups: usize) { + fn retain_after_emit(deferred: &mut DeferredRows, emit_groups: usize) { let emit_groups = emit_groups as u32; - let mut retained_batches = Vec::with_capacity(self.batches.len()); - let mut retained_entries = Vec::with_capacity(self.batch_entries.len()); - - for (batch, entries) in self.batches.drain(..).zip(self.batch_entries.drain(..)) { + let mut retained_batches = Vec::with_capacity(deferred.batches.len()); + let mut retained_entries = Vec::with_capacity(deferred.batch_entries.len()); + + for (batch, entries) in deferred + .batches + .drain(..) + .zip(deferred.batch_entries.drain(..)) + { let entries: Vec<_> = entries .into_iter() .filter_map(|(group_idx, row_idx)| { @@ -463,8 +445,8 @@ impl StringAggGroupsAccumulator { retained_entries.push(entries); } - self.batches = retained_batches; - self.batch_entries = retained_entries; + deferred.batches = retained_batches; + deferred.batch_entries = retained_entries; } fn append_rows_typed<'a, A>(array: &A, group_indices: &[usize]) -> Vec<(u32, u32)> @@ -520,6 +502,36 @@ impl StringAggGroupsAccumulator { .sum() } + fn append_eager_batch( + &mut self, + array: &ArrayRef, + group_indices: &[usize], + ) -> Result<()> { + let added = match array.data_type() { + DataType::Utf8 => Self::append_batch_typed( + &mut self.values, + array.as_string::().iter(), + group_indices, + &self.delimiter, + ), + DataType::LargeUtf8 => Self::append_batch_typed( + &mut self.values, + array.as_string::().iter(), + group_indices, + &self.delimiter, + ), + DataType::Utf8View => Self::append_batch_typed( + &mut self.values, + array.as_string_view().iter(), + group_indices, + &self.delimiter, + ), + other => return internal_err!("string_agg unexpected data type: {other}"), + }; + self.total_data_bytes += added; + Ok(()) + } + fn append_batch_values_typed<'a, A>( values: &mut [Option], entries: &[(u32, u32)], @@ -562,16 +574,48 @@ impl StringAggGroupsAccumulator { Ok(()) } - fn should_defer( - &self, - input: &StringInputArray<'_>, - total_num_groups: usize, - ) -> bool { + fn estimated_payload_len(array: &ArrayRef) -> Option { + let non_null_rows = array.len().saturating_sub(array.null_count()); + if non_null_rows == 0 { + return None; + } + + match array.data_type() { + DataType::Utf8 => { + Some(array.as_string::().value_data().len() / non_null_rows) + } + DataType::LargeUtf8 => { + Some(array.as_string::().value_data().len() / non_null_rows) + } + DataType::Utf8View => Some( + array + .as_string_view() + .data_buffers() + .iter() + .map(|buffer| buffer.len()) + .sum::() + / non_null_rows, + ), + _ => None, + } + } + + fn should_promote(&self, array: &ArrayRef, total_num_groups: usize) -> bool { total_num_groups >= Self::DEFER_GROUP_THRESHOLD - && input - .sample_non_null_len() + && Self::estimated_payload_len(array) .is_some_and(|len| len >= Self::DEFER_PAYLOAD_LEN_THRESHOLD) } + + fn defer_batch(&mut self, array: ArrayRef, group_indices: &[usize]) -> Result<()> { + let input = StringInputArray::try_new(&array)?; + let entries = input.append_rows(group_indices); + if !entries.is_empty() { + let deferred = self.deferred.get_or_insert_with(DeferredRows::default); + deferred.batches.push(array); + deferred.batch_entries.push(entries); + } + Ok(()) + } } impl GroupsAccumulator for StringAggGroupsAccumulator { @@ -582,23 +626,15 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - self.num_groups = self.num_groups.max(total_num_groups); self.values.resize(total_num_groups, None); let array = apply_filter_as_nulls(&values[0], opt_filter)?; - let input = StringInputArray::try_new(&array)?; - if self.should_defer(&input, total_num_groups) { - let entries = input.append_rows(group_indices); - if !entries.is_empty() { - self.batches.push(array); - self.batch_entries.push(entries); - } + if self.deferred.is_some() { + self.defer_batch(array, group_indices)?; + } else if self.should_promote(&array, total_num_groups) { + self.defer_batch(array, group_indices)?; } else { - self.total_data_bytes += input.append_materialized( - &mut self.values, - group_indices, - &self.delimiter, - ); + self.append_eager_batch(&array, group_indices)?; } Ok(()) @@ -613,21 +649,27 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { .sum(); self.total_data_bytes -= emitted_bytes; - for (batch, entries) in self.batches.iter().zip(&self.batch_entries) { - Self::append_batch_values( - &mut to_emit, - entries, - batch, - &self.delimiter, - emit_groups, - )?; + if let Some(deferred) = &self.deferred { + for (batch, entries) in deferred.batches.iter().zip(&deferred.batch_entries) { + Self::append_batch_values( + &mut to_emit, + entries, + batch, + &self.delimiter, + emit_groups, + )?; + } } match emit_to { EmitTo::All => self.clear_state(), EmitTo::First(_) => { - self.retain_after_emit(emit_groups); - self.num_groups = self.values.len(); + if let Some(deferred) = &mut self.deferred { + Self::retain_after_emit(deferred, emit_groups); + if deferred.batches.is_empty() { + self.deferred = None; + } + } } } @@ -671,17 +713,25 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { self.total_data_bytes + self.values.capacity() * size_of::>() + self - .batches - .iter() - .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) - .sum::() - + self.batches.capacity() * size_of::() - + self - .batch_entries - .iter() - .map(|entries| entries.capacity() * size_of::<(u32, u32)>()) - .sum::() - + self.batch_entries.capacity() * size_of::>() + .deferred + .as_ref() + .map(|deferred| { + deferred + .batches + .iter() + .map(|arr| { + arr.to_data().get_slice_memory_size().unwrap_or_default() + }) + .sum::() + + deferred.batches.capacity() * size_of::() + + deferred + .batch_entries + .iter() + .map(|entries| entries.capacity() * size_of::<(u32, u32)>()) + .sum::() + + deferred.batch_entries.capacity() * size_of::>() + }) + .unwrap_or_default() + self.delimiter.capacity() + size_of_val(self) } @@ -1201,6 +1251,7 @@ mod tests { let eager_values: ArrayRef = Arc::new(LargeStringArray::from(vec!["a", "b", "c", "d"])); acc.update_batch(&[eager_values], &[0, 1, 0, 1], None, 40)?; + assert!(acc.deferred.is_none()); let deferred_values: ArrayRef = Arc::new(LargeStringArray::from(vec![ "large0_abcdefghijklmnopqrstuvwxyzabcdef", @@ -1208,6 +1259,7 @@ mod tests { "large2_cdefghijklmnopqrstuvwxyzabcdefgh", ])); acc.update_batch(&[deferred_values], &[0, 1, 39], None, 40)?; + assert!(acc.deferred.is_some()); let result = evaluate_groups(&mut acc, EmitTo::First(2)); assert_eq!( @@ -1224,4 +1276,24 @@ mod tests { assert_eq!(remaining, expected); Ok(()) } + + #[test] + fn groups_short_payloads_do_not_promote_to_deferred() -> Result<()> { + let mut acc = make_groups_acc(","); + let values: ArrayRef = Arc::new(LargeStringArray::from(vec![ + "aaa", "bbb", "ccc", "ddd", "eee", "fff", + ])); + + acc.update_batch(&[values], &[0, 1, 39, 38, 0, 1], None, 40)?; + + assert!(acc.deferred.is_none()); + let result = evaluate_groups(&mut acc, EmitTo::All); + let mut expected = vec![None; 40]; + expected[0] = Some("aaa,eee".to_string()); + expected[1] = Some("bbb,fff".to_string()); + expected[38] = Some("ddd".to_string()); + expected[39] = Some("ccc".to_string()); + assert_eq!(result, expected); + Ok(()) + } } From efd0f55b9e49f3d5a2b3ee1f50e58d4265148fad Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Apr 2026 11:26:06 +0800 Subject: [PATCH 10/13] Optimize deferred batch handling in StringAggGroupsAccumulator --- datafusion/functions-aggregate/src/string_agg.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index dc29a8898b05..1bac7546a29a 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -629,9 +629,7 @@ impl GroupsAccumulator for StringAggGroupsAccumulator { self.values.resize(total_num_groups, None); let array = apply_filter_as_nulls(&values[0], opt_filter)?; - if self.deferred.is_some() { - self.defer_batch(array, group_indices)?; - } else if self.should_promote(&array, total_num_groups) { + if self.deferred.is_some() || self.should_promote(&array, total_num_groups) { self.defer_batch(array, group_indices)?; } else { self.append_eager_batch(&array, group_indices)?; From 0187a7c482d0c956ea6d1bde54c7ef763908fccd Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Apr 2026 13:13:47 +0800 Subject: [PATCH 11/13] modify benchmark --- .../core/benches/aggregate_query_sql.rs | 95 ++++++++++++------- datafusion/core/benches/data_utils/mod.rs | 78 ++++++++++++++- 2 files changed, 135 insertions(+), 38 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index d7e24aceba17..8eb23218f013 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -17,8 +17,8 @@ mod data_utils; -use criterion::{Criterion, criterion_group, criterion_main}; -use data_utils::create_table_provider; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload}; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -36,13 +36,38 @@ fn create_context( partitions_len: usize, array_len: usize, batch_size: usize, +) -> Result>> { + create_context_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +fn create_context_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = create_table_provider(partitions_len, array_len, batch_size)?; + let provider = create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + utf8_payload_profile, + )?; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } +fn string_agg_sql(group_by_column: &str) -> String { + format!( + "SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}" + ) +} + fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -296,38 +321,42 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("string_agg_query_group_by_few_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_narrow, string_agg(utf8, ',') \ - FROM t GROUP BY u64_narrow", - ) - }) - }); + // These payload sizes keep the original 4-value cardinality while changing + // only the bytes copied into grouped `string_agg` state: + // - small_3b preserves the existing `hi0`..`hi3` baseline + // - medium_64b makes copy costs measurable without overwhelming the query + // - large_1024b stresses both CPU and memory behavior + let string_agg_profiles = [ + (Utf8PayloadProfile::Small, "small_3b"), + (Utf8PayloadProfile::Medium, "medium_64b"), + (Utf8PayloadProfile::Large, "large_1024b"), + ] + .into_iter() + .map(|(profile, label)| { + ( + label, + create_context_with_payload(partitions_len, array_len, batch_size, profile) + .unwrap(), + ) + }) + .collect::>(); - c.bench_function("string_agg_query_group_by_mid_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_mid, string_agg(utf8, ',') \ - FROM t GROUP BY u64_mid", - ) - }) - }); + let string_agg_queries = [ + ("few_groups", string_agg_sql("u64_narrow")), + ("mid_groups", string_agg_sql("u64_mid")), + ("many_groups", string_agg_sql("u64_wide")), + ]; - c.bench_function("string_agg_query_group_by_many_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_wide, string_agg(utf8, ',') \ - FROM t GROUP BY u64_wide", - ) - }) - }); + let mut string_agg_group = c.benchmark_group("string_agg_payloads"); + for (query_name, sql) in &string_agg_queries { + for (payload_name, payload_ctx) in &string_agg_profiles { + string_agg_group + .bench_function(BenchmarkId::new(*query_name, *payload_name), |b| { + b.iter(|| query(payload_ctx.clone(), &rt, sql)) + }); + } + } + string_agg_group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 728c6490c72b..250f393408b9 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -35,6 +35,23 @@ use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; +/// Payload profile for the benchmark `utf8` column. +/// +/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and +/// large profiles keep the same low cardinality but scale each value's byte +/// width so string aggregation can expose the cost of copying larger payloads. +#[derive(Clone, Copy, Debug)] +pub enum Utf8PayloadProfile { + /// 3-byte baseline values such as `hi0`. + Small, + /// 64-byte payloads that are large enough to make copying noticeable + /// without dominating the benchmark with allocator churn. + Medium, + /// 1024-byte payloads that amplify both CPU and memory pressure in + /// grouped `string_agg` workloads. + Large, +} + /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly @@ -44,9 +61,32 @@ pub fn create_table_provider( array_len: usize, batch_size: usize, ) -> Result> { + create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +/// Create an in-memory table with a configurable `utf8` payload size. +#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly +#[allow(dead_code)] +pub fn create_table_provider_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, +) -> Result> { + let _ = Utf8PayloadProfile::all(); let schema = Arc::new(create_schema()); - let partitions = - create_record_batches(&schema, array_len, partitions_len, batch_size); + let partitions = create_record_batches( + &schema, + array_len, + partitions_len, + batch_size, + utf8_payload_profile, + ); // declare a table in memory. In spark API, this corresponds to createDataFrame(...). MemTable::try_new(schema, partitions).map(Arc::new) } @@ -91,12 +131,14 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, + payloads: &[String; 4], ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let keys = StringArray::from_iter_values( - (0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())), - ); + let keys = StringArray::from_iter_values((0..batch_size).map(|_| { + let suffix = *key_suffixes.choose(rng).unwrap(); + payloads[suffix].as_str() + })); let values = create_data(rng, batch_size, 0.5); @@ -146,10 +188,12 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); let batches_per_partition = array_len / batch_size / partitions_len; + let payloads = utf8_payload_profile.payloads(); for _ in 0..partitions_len { let mut batches = Vec::with_capacity(batches_per_partition); @@ -159,6 +203,7 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, + &payloads, )); } partitions.push(batches); @@ -166,6 +211,29 @@ pub fn create_record_batches( partitions } +impl Utf8PayloadProfile { + fn all() -> [Self; 3] { + [Self::Small, Self::Medium, Self::Large] + } + + fn payloads(self) -> [String; 4] { + std::array::from_fn(|idx| match self { + Self::Small => format!("hi{idx}"), + Self::Medium => payload_string("mid", idx, 64), + Self::Large => payload_string("large", idx, 1024), + }) + } +} + +fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { + let mut value = format!("{prefix}{suffix}_"); + value.extend(std::iter::repeat_n( + (b'a' + suffix as u8) as char, + target_len - value.len(), + )); + value +} + /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From ca5d6850e9758261d0db1fa943d9764eb1387ee3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 8 Apr 2026 21:51:46 +0800 Subject: [PATCH 12/13] revert to main benchmark --- .../core/benches/aggregate_query_sql.rs | 95 +++++++------------ datafusion/core/benches/data_utils/mod.rs | 78 +-------------- 2 files changed, 38 insertions(+), 135 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 8eb23218f013..d7e24aceba17 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -17,8 +17,8 @@ mod data_utils; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; -use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload}; +use criterion::{Criterion, criterion_group, criterion_main}; +use data_utils::create_table_provider; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -36,38 +36,13 @@ fn create_context( partitions_len: usize, array_len: usize, batch_size: usize, -) -> Result>> { - create_context_with_payload( - partitions_len, - array_len, - batch_size, - Utf8PayloadProfile::Small, - ) -} - -fn create_context_with_payload( - partitions_len: usize, - array_len: usize, - batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = create_table_provider_with_payload( - partitions_len, - array_len, - batch_size, - utf8_payload_profile, - )?; + let provider = create_table_provider(partitions_len, array_len, batch_size)?; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } -fn string_agg_sql(group_by_column: &str) -> String { - format!( - "SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}" - ) -} - fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -321,42 +296,38 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - // These payload sizes keep the original 4-value cardinality while changing - // only the bytes copied into grouped `string_agg` state: - // - small_3b preserves the existing `hi0`..`hi3` baseline - // - medium_64b makes copy costs measurable without overwhelming the query - // - large_1024b stresses both CPU and memory behavior - let string_agg_profiles = [ - (Utf8PayloadProfile::Small, "small_3b"), - (Utf8PayloadProfile::Medium, "medium_64b"), - (Utf8PayloadProfile::Large, "large_1024b"), - ] - .into_iter() - .map(|(profile, label)| { - ( - label, - create_context_with_payload(partitions_len, array_len, batch_size, profile) - .unwrap(), - ) - }) - .collect::>(); + c.bench_function("string_agg_query_group_by_few_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_narrow, string_agg(utf8, ',') \ + FROM t GROUP BY u64_narrow", + ) + }) + }); - let string_agg_queries = [ - ("few_groups", string_agg_sql("u64_narrow")), - ("mid_groups", string_agg_sql("u64_mid")), - ("many_groups", string_agg_sql("u64_wide")), - ]; + c.bench_function("string_agg_query_group_by_mid_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_mid, string_agg(utf8, ',') \ + FROM t GROUP BY u64_mid", + ) + }) + }); - let mut string_agg_group = c.benchmark_group("string_agg_payloads"); - for (query_name, sql) in &string_agg_queries { - for (payload_name, payload_ctx) in &string_agg_profiles { - string_agg_group - .bench_function(BenchmarkId::new(*query_name, *payload_name), |b| { - b.iter(|| query(payload_ctx.clone(), &rt, sql)) - }); - } - } - string_agg_group.finish(); + c.bench_function("string_agg_query_group_by_many_groups", |b| { + b.iter(|| { + query( + ctx.clone(), + &rt, + "SELECT u64_wide, string_agg(utf8, ',') \ + FROM t GROUP BY u64_wide", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 250f393408b9..728c6490c72b 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -35,23 +35,6 @@ use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; -/// Payload profile for the benchmark `utf8` column. -/// -/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and -/// large profiles keep the same low cardinality but scale each value's byte -/// width so string aggregation can expose the cost of copying larger payloads. -#[derive(Clone, Copy, Debug)] -pub enum Utf8PayloadProfile { - /// 3-byte baseline values such as `hi0`. - Small, - /// 64-byte payloads that are large enough to make copying noticeable - /// without dominating the benchmark with allocator churn. - Medium, - /// 1024-byte payloads that amplify both CPU and memory pressure in - /// grouped `string_agg` workloads. - Large, -} - /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly @@ -61,32 +44,9 @@ pub fn create_table_provider( array_len: usize, batch_size: usize, ) -> Result> { - create_table_provider_with_payload( - partitions_len, - array_len, - batch_size, - Utf8PayloadProfile::Small, - ) -} - -/// Create an in-memory table with a configurable `utf8` payload size. -#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly -#[allow(dead_code)] -pub fn create_table_provider_with_payload( - partitions_len: usize, - array_len: usize, - batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, -) -> Result> { - let _ = Utf8PayloadProfile::all(); let schema = Arc::new(create_schema()); - let partitions = create_record_batches( - &schema, - array_len, - partitions_len, - batch_size, - utf8_payload_profile, - ); + let partitions = + create_record_batches(&schema, array_len, partitions_len, batch_size); // declare a table in memory. In spark API, this corresponds to createDataFrame(...). MemTable::try_new(schema, partitions).map(Arc::new) } @@ -131,14 +91,12 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, - payloads: &[String; 4], ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let keys = StringArray::from_iter_values((0..batch_size).map(|_| { - let suffix = *key_suffixes.choose(rng).unwrap(); - payloads[suffix].as_str() - })); + let keys = StringArray::from_iter_values( + (0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())), + ); let values = create_data(rng, batch_size, 0.5); @@ -188,12 +146,10 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, - utf8_payload_profile: Utf8PayloadProfile, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); let batches_per_partition = array_len / batch_size / partitions_len; - let payloads = utf8_payload_profile.payloads(); for _ in 0..partitions_len { let mut batches = Vec::with_capacity(batches_per_partition); @@ -203,7 +159,6 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, - &payloads, )); } partitions.push(batches); @@ -211,29 +166,6 @@ pub fn create_record_batches( partitions } -impl Utf8PayloadProfile { - fn all() -> [Self; 3] { - [Self::Small, Self::Medium, Self::Large] - } - - fn payloads(self) -> [String; 4] { - std::array::from_fn(|idx| match self { - Self::Small => format!("hi{idx}"), - Self::Medium => payload_string("mid", idx, 64), - Self::Large => payload_string("large", idx, 1024), - }) - } -} - -fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { - let mut value = format!("{prefix}{suffix}_"); - value.extend(std::iter::repeat_n( - (b'a' + suffix as u8) as char, - target_len - value.len(), - )); - value -} - /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From d1c1799b3991ecb9157e9e1abc3c9013693cb12f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 9 Apr 2026 14:51:07 +0800 Subject: [PATCH 13/13] fix: update comment in string_agg.rs to clarify compact mixed batches todo - Revised comment to indicate a future task for compacting mixed batches in the StringAggGroupsAccumulator implementation. --- datafusion/functions-aggregate/src/string_agg.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 1bac7546a29a..211d3c4d7552 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -439,7 +439,8 @@ impl StringAggGroupsAccumulator { } // Keep the original arrays for this prototype and only renumber - // retained groups. SUB_ISSUE_04 will compact mixed batches so + // retained groups. + // todo: compact mixed batches so // partially emitted batches no longer pin their full inputs. retained_batches.push(batch); retained_entries.push(entries);