Skip to content

Commit ca5d685

Browse files
committed
revert to main benchmark
1 parent 0187a7c commit ca5d685

File tree

2 files changed

+38
-135
lines changed

2 files changed

+38
-135
lines changed

datafusion/core/benches/aggregate_query_sql.rs

Lines changed: 33 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
mod data_utils;
1919

20-
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
21-
use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload};
20+
use criterion::{Criterion, criterion_group, criterion_main};
21+
use data_utils::create_table_provider;
2222
use datafusion::error::Result;
2323
use datafusion::execution::context::SessionContext;
2424
use parking_lot::Mutex;
@@ -36,38 +36,13 @@ fn create_context(
3636
partitions_len: usize,
3737
array_len: usize,
3838
batch_size: usize,
39-
) -> Result<Arc<Mutex<SessionContext>>> {
40-
create_context_with_payload(
41-
partitions_len,
42-
array_len,
43-
batch_size,
44-
Utf8PayloadProfile::Small,
45-
)
46-
}
47-
48-
fn create_context_with_payload(
49-
partitions_len: usize,
50-
array_len: usize,
51-
batch_size: usize,
52-
utf8_payload_profile: Utf8PayloadProfile,
5339
) -> Result<Arc<Mutex<SessionContext>>> {
5440
let ctx = SessionContext::new();
55-
let provider = create_table_provider_with_payload(
56-
partitions_len,
57-
array_len,
58-
batch_size,
59-
utf8_payload_profile,
60-
)?;
41+
let provider = create_table_provider(partitions_len, array_len, batch_size)?;
6142
ctx.register_table("t", provider)?;
6243
Ok(Arc::new(Mutex::new(ctx)))
6344
}
6445

65-
fn string_agg_sql(group_by_column: &str) -> String {
66-
format!(
67-
"SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}"
68-
)
69-
}
70-
7146
fn criterion_benchmark(c: &mut Criterion) {
7247
let partitions_len = 8;
7348
let array_len = 32768 * 2; // 2^16
@@ -321,42 +296,38 @@ fn criterion_benchmark(c: &mut Criterion) {
321296
})
322297
});
323298

324-
// These payload sizes keep the original 4-value cardinality while changing
325-
// only the bytes copied into grouped `string_agg` state:
326-
// - small_3b preserves the existing `hi0`..`hi3` baseline
327-
// - medium_64b makes copy costs measurable without overwhelming the query
328-
// - large_1024b stresses both CPU and memory behavior
329-
let string_agg_profiles = [
330-
(Utf8PayloadProfile::Small, "small_3b"),
331-
(Utf8PayloadProfile::Medium, "medium_64b"),
332-
(Utf8PayloadProfile::Large, "large_1024b"),
333-
]
334-
.into_iter()
335-
.map(|(profile, label)| {
336-
(
337-
label,
338-
create_context_with_payload(partitions_len, array_len, batch_size, profile)
339-
.unwrap(),
340-
)
341-
})
342-
.collect::<Vec<_>>();
299+
c.bench_function("string_agg_query_group_by_few_groups", |b| {
300+
b.iter(|| {
301+
query(
302+
ctx.clone(),
303+
&rt,
304+
"SELECT u64_narrow, string_agg(utf8, ',') \
305+
FROM t GROUP BY u64_narrow",
306+
)
307+
})
308+
});
343309

344-
let string_agg_queries = [
345-
("few_groups", string_agg_sql("u64_narrow")),
346-
("mid_groups", string_agg_sql("u64_mid")),
347-
("many_groups", string_agg_sql("u64_wide")),
348-
];
310+
c.bench_function("string_agg_query_group_by_mid_groups", |b| {
311+
b.iter(|| {
312+
query(
313+
ctx.clone(),
314+
&rt,
315+
"SELECT u64_mid, string_agg(utf8, ',') \
316+
FROM t GROUP BY u64_mid",
317+
)
318+
})
319+
});
349320

350-
let mut string_agg_group = c.benchmark_group("string_agg_payloads");
351-
for (query_name, sql) in &string_agg_queries {
352-
for (payload_name, payload_ctx) in &string_agg_profiles {
353-
string_agg_group
354-
.bench_function(BenchmarkId::new(*query_name, *payload_name), |b| {
355-
b.iter(|| query(payload_ctx.clone(), &rt, sql))
356-
});
357-
}
358-
}
359-
string_agg_group.finish();
321+
c.bench_function("string_agg_query_group_by_many_groups", |b| {
322+
b.iter(|| {
323+
query(
324+
ctx.clone(),
325+
&rt,
326+
"SELECT u64_wide, string_agg(utf8, ',') \
327+
FROM t GROUP BY u64_wide",
328+
)
329+
})
330+
});
360331
}
361332

362333
criterion_group!(benches, criterion_benchmark);

datafusion/core/benches/data_utils/mod.rs

Lines changed: 5 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,6 @@ use rand_distr::{Normal, Pareto};
3535
use std::fmt::Write;
3636
use std::sync::Arc;
3737

38-
/// Payload profile for the benchmark `utf8` column.
39-
///
40-
/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and
41-
/// large profiles keep the same low cardinality but scale each value's byte
42-
/// width so string aggregation can expose the cost of copying larger payloads.
43-
#[derive(Clone, Copy, Debug)]
44-
pub enum Utf8PayloadProfile {
45-
/// 3-byte baseline values such as `hi0`.
46-
Small,
47-
/// 64-byte payloads that are large enough to make copying noticeable
48-
/// without dominating the benchmark with allocator churn.
49-
Medium,
50-
/// 1024-byte payloads that amplify both CPU and memory pressure in
51-
/// grouped `string_agg` workloads.
52-
Large,
53-
}
54-
5538
/// create an in-memory table given the partition len, array len, and batch size,
5639
/// and the result table will be of array_len in total, and then partitioned, and batched.
5740
#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly
@@ -61,32 +44,9 @@ pub fn create_table_provider(
6144
array_len: usize,
6245
batch_size: usize,
6346
) -> Result<Arc<MemTable>> {
64-
create_table_provider_with_payload(
65-
partitions_len,
66-
array_len,
67-
batch_size,
68-
Utf8PayloadProfile::Small,
69-
)
70-
}
71-
72-
/// Create an in-memory table with a configurable `utf8` payload size.
73-
#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly
74-
#[allow(dead_code)]
75-
pub fn create_table_provider_with_payload(
76-
partitions_len: usize,
77-
array_len: usize,
78-
batch_size: usize,
79-
utf8_payload_profile: Utf8PayloadProfile,
80-
) -> Result<Arc<MemTable>> {
81-
let _ = Utf8PayloadProfile::all();
8247
let schema = Arc::new(create_schema());
83-
let partitions = create_record_batches(
84-
&schema,
85-
array_len,
86-
partitions_len,
87-
batch_size,
88-
utf8_payload_profile,
89-
);
48+
let partitions =
49+
create_record_batches(&schema, array_len, partitions_len, batch_size);
9050
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
9151
MemTable::try_new(schema, partitions).map(Arc::new)
9252
}
@@ -131,14 +91,12 @@ fn create_record_batch(
13191
rng: &mut StdRng,
13292
batch_size: usize,
13393
batch_index: usize,
134-
payloads: &[String; 4],
13594
) -> RecordBatch {
13695
// Randomly choose from 4 distinct key values; a higher number increases sparseness.
13796
let key_suffixes = [0, 1, 2, 3];
138-
let keys = StringArray::from_iter_values((0..batch_size).map(|_| {
139-
let suffix = *key_suffixes.choose(rng).unwrap();
140-
payloads[suffix].as_str()
141-
}));
97+
let keys = StringArray::from_iter_values(
98+
(0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())),
99+
);
142100

143101
let values = create_data(rng, batch_size, 0.5);
144102

@@ -188,12 +146,10 @@ pub fn create_record_batches(
188146
array_len: usize,
189147
partitions_len: usize,
190148
batch_size: usize,
191-
utf8_payload_profile: Utf8PayloadProfile,
192149
) -> Vec<Vec<RecordBatch>> {
193150
let mut rng = StdRng::seed_from_u64(42);
194151
let mut partitions = Vec::with_capacity(partitions_len);
195152
let batches_per_partition = array_len / batch_size / partitions_len;
196-
let payloads = utf8_payload_profile.payloads();
197153

198154
for _ in 0..partitions_len {
199155
let mut batches = Vec::with_capacity(batches_per_partition);
@@ -203,37 +159,13 @@ pub fn create_record_batches(
203159
&mut rng,
204160
batch_size,
205161
batch_index,
206-
&payloads,
207162
));
208163
}
209164
partitions.push(batches);
210165
}
211166
partitions
212167
}
213168

214-
impl Utf8PayloadProfile {
215-
fn all() -> [Self; 3] {
216-
[Self::Small, Self::Medium, Self::Large]
217-
}
218-
219-
fn payloads(self) -> [String; 4] {
220-
std::array::from_fn(|idx| match self {
221-
Self::Small => format!("hi{idx}"),
222-
Self::Medium => payload_string("mid", idx, 64),
223-
Self::Large => payload_string("large", idx, 1024),
224-
})
225-
}
226-
}
227-
228-
fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String {
229-
let mut value = format!("{prefix}{suffix}_");
230-
value.extend(std::iter::repeat_n(
231-
(b'a' + suffix as u8) as char,
232-
target_len - value.len(),
233-
));
234-
value
235-
}
236-
237169
/// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder
238170
/// so that both can be used interchangeably.
239171
enum TraceIdBuilder {

0 commit comments

Comments
 (0)