Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions arrow-ipc/benches/ipc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{RecordBatch, builder::StringBuilder};
use arrow_array::RecordBatch;
use arrow_array::builder::{
Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
};
use arrow_array::types::UInt32Type;
use arrow_ipc::CompressionType;
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter};
use arrow_schema::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
use std::sync::Arc;
Expand Down Expand Up @@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
writer.finish().unwrap();
})
});

group.bench_function("StreamWriter/write_10/dict", |b| {
let batches = create_unique_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
b.iter(move || {
buffer.clear();
let mut writer = StreamWriter::try_new(&mut buffer, schema.as_ref()).unwrap();
for batch in &batches {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
})
});

group.bench_function("StreamWriter/write_10/dict/delta", |b| {
let batches = create_delta_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);

b.iter(move || {
buffer.clear();

let mut writer =
StreamWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone())
.unwrap();

for batch in &batches {
writer.write(batch).unwrap();
}

writer.finish().unwrap();
})
});

// The file writer rejects dictionary replacement, so only the delta case is
// exercised here (growing dictionaries that are prefixes of one another).
group.bench_function("FileWriter/write_10/dict/delta", |b| {
let batches = create_delta_dict_batches(10, 8192);
let schema = batches[0].schema();
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options =
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);

b.iter(move || {
buffer.clear();

let mut writer =
FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone())
.unwrap();

for batch in &batches {
writer.write(batch).unwrap();
}

writer.finish().unwrap();
})
});
}

/// Build `n` record batches with a single dictionary column whose dictionary
/// grows across batches. A single builder is reused with `finish_preserve_values`
/// so each batch's dictionary has the previous batch's as a prefix which allows
/// us to emit deltas.
fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
let schema = Arc::new(Schema::new(vec![Field::new(
"d0",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
false,
)]));
let mut builder = StringDictionaryBuilder::<UInt32Type>::new();

let mut batches = Vec::with_capacity(n);
for i in 0..n {
// 3/4 of the rows reuse values shared by every batch, the other 1/4
// introduce values unique to this batch which extends the dictionary.
for r in 0..num_rows {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to before.

if r < num_rows / 4 {
builder.append_value(format!("batch {i} value {}", r));
} else {
builder.append_value(format!("shared {r}"));
}
}

// Preserve the values builder so the dictionary accumulates across batches.
let dict = builder.finish_preserve_values();
batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap());
}

batches
}

/// Build `n` record batches each with a completely distinct dictionary for each batch.
fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
let schema = Arc::new(Schema::new(vec![Field::new(
"d0",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
false,
)]));

let mut batches = Vec::with_capacity(n);
for i in 0..n {
let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
for r in 0..num_rows {
builder.append_value(format!("batch {i} value {}", r % (num_rows / 2)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an 8k batch size there will be 4k unique values, and for 64k there will be 32k unique values. I don't think this alone is the right way to benchmark dictionaries. Dictionaries are focused on low cardinality, so it makes more sense to parameterize benchmarks by target cardinality. For example, (5%, 10%, 25%, 50%) unique values relative to batch size.
The implementation should also grow with the number of unique values, since that's the point of the IPC format in delta mode:

A dictionary batch with isDelta set indicates that its vector should be concatenated with those of any previous batches with the same id.

Varying cardinality helps detect whether the encoder is doing O(N) work (proportional to total rows) when it should be doing O(K) work (proportional to unique values).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the thoughtful review!

Varying cardinality helps detect whether the encoder is doing O(N) work (proportional to total rows) when it should be doing O(K) work (proportional to unique values).

I would clarify that we are doing O(K + N) work as there is still per row overhead in encoding the dictionary keys as well as per unique value cost in encoding the dictionary values.

I agree that benchmarks varying the amount of unique values could yield useful information, but these simple benchmarks can still answer whether we've meaningfully changed the amount of work required to emit dictionary batches.

I think until those additional parameters are needed to demonstrate something the current set cannot, I favor the simplicity of omitting them.

Just my .02 of course, happy to take more feedback on this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me.

}
let dict = builder.finish();
batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap());
}

batches
}

fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
Expand Down
Loading