From 71e5f8f6cda3c0ee1ab3564e28c27bdf66ce9aa1 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 09:42:56 -0700 Subject: [PATCH] Add benches --- arrow-ipc/benches/ipc_writer.rs | 122 +++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 3 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index eda7e3c58fe0..5050ff6cd112 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; -use arrow_array::{RecordBatch, builder::StringBuilder}; +use arrow_array::RecordBatch; +use arrow_array::builder::{ + Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, +}; +use arrow_array::types::UInt32Type; use arrow_ipc::CompressionType; -use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; +use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use std::sync::Arc; @@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) { writer.finish().unwrap(); }) }); + + group.bench_function("StreamWriter/write_10/dict", |b| { + let batches = create_unique_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + b.iter(move || { + buffer.clear(); + let mut writer = StreamWriter::try_new(&mut buffer, schema.as_ref()).unwrap(); + for batch in &batches { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + }) + }); + + group.bench_function("StreamWriter/write_10/dict/delta", |b| { + let batches = create_delta_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + StreamWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); + + // The file writer rejects dictionary replacement, so only the delta case is + // exercised here (growing dictionaries that are prefixes of one another). + group.bench_function("FileWriter/write_10/dict/delta", |b| { + let batches = create_delta_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); +} + +/// Build `n` record batches with a single dictionary column whose dictionary +/// grows across batches. A single builder is reused with `finish_preserve_values` +/// so each batch's dictionary has the previous batch's as a prefix which allows +/// us to emit deltas. +fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); + let mut builder = StringDictionaryBuilder::::new(); + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + // 3/4 of the rows reuse values shared by every batch, the other 1/4 + // introduce values unique to this batch which extends the dictionary. + for r in 0..num_rows { + if r < num_rows / 4 { + builder.append_value(format!("batch {i} value {}", r)); + } else { + builder.append_value(format!("shared {r}")); + } + } + + // Preserve the values builder so the dictionary accumulates across batches. + let dict = builder.finish_preserve_values(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); + } + + batches +} + +/// Build `n` record batches each with a completely distinct dictionary for each batch. +fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + let mut builder = StringDictionaryBuilder::::new(); + for r in 0..num_rows { + builder.append_value(format!("batch {i} value {}", r % (num_rows / 2))); + } + let dict = builder.finish(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); + } + + batches } fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {