Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions arrow-ipc/benches/ipc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{RecordBatch, builder::StringBuilder};
use arrow_array::RecordBatch;
use arrow_array::builder::{
Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
};
use arrow_array::types::UInt32Type;
use arrow_ipc::CompressionType;
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter};
use arrow_schema::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
use std::sync::Arc;
Expand Down Expand Up @@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
writer.finish().unwrap();
})
});

group.bench_function("StreamWriter/write_10/dict", |b| {
let batches = create_unique_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
b.iter(move || {
buffer.clear();
let mut writer = StreamWriter::try_new(&mut buffer, schema.as_ref()).unwrap();
for batch in &batches {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
})
});

group.bench_function("StreamWriter/write_10/dict/delta", |b| {
let batches = create_delta_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);

b.iter(move || {
buffer.clear();

let mut writer =
StreamWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone())
.unwrap();

for batch in &batches {
writer.write(batch).unwrap();
}

writer.finish().unwrap();
})
});

// The file writer rejects dictionary replacement, so only the delta case is
// exercised here (growing dictionaries that are prefixes of one another).
group.bench_function("FileWriter/write_10/dict/delta", |b| {
let batches = create_delta_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);

b.iter(move || {
buffer.clear();

let mut writer =
FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone())
.unwrap();

for batch in &batches {
writer.write(batch).unwrap();
}

writer.finish().unwrap();
})
});
}

/// Build `n` record batches with a single dictionary column whose dictionary
/// grows across batches. A single builder is reused with `finish_preserve_values`
/// so each batch's dictionary has the previous batch's as a prefix which allows
/// us to emit deltas.
fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
let schema = Arc::new(Schema::new(vec![Field::new(
"d0",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
false,
)]));
let mut builder = StringDictionaryBuilder::<UInt32Type>::new();

let mut batches = Vec::with_capacity(n);
for i in 0..n {
// 3/4 of the rows reuse values shared by every batch, the other 1/4
// introduce values unique to this batch which extends the dictionary.
for r in 0..num_rows {
if r < num_rows / 4 {
builder.append_value(format!("batch {i} value {}", r));
} else {
builder.append_value(format!("shared {r}"));
}
}

// Preserve the values builder so the dictionary accumulates across batches.
let dict = builder.finish_preserve_values();
batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap());
}

batches
}

/// Build `n` record batches each with a completely distinct dictionary for each batch.
fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
let schema = Arc::new(Schema::new(vec![Field::new(
"d0",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
false,
)]));

let mut batches = Vec::with_capacity(n);
for i in 0..n {
let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
for r in 0..num_rows {
builder.append_value(format!("batch {i} value {}", r % (num_rows / 2)));
}
let dict = builder.finish();
batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap());
}

batches
}

fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
Expand Down
Loading
Loading