From d0dacffa171df359b5dc9aa6e5ee9176a2689b27 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 15:36:40 -0700 Subject: [PATCH 01/19] c --- arrow-ipc/benches/ipc_writer.rs | 42 +++++- arrow-ipc/src/writer.rs | 249 +++++++++++++++++++++++--------- 2 files changed, 222 insertions(+), 69 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index eda7e3c58fe0..5c17d6e8d617 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -16,7 +16,8 @@ // under the License. use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; -use arrow_array::{RecordBatch, builder::StringBuilder}; +use arrow_array::types::Int32Type; +use arrow_array::{DictionaryArray, RecordBatch, builder::StringBuilder}; use arrow_ipc::CompressionType; use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; @@ -69,6 +70,45 @@ fn criterion_benchmark(c: &mut Criterion) { writer.finish().unwrap(); }) }); + + // Each batch carries a distinct dictionary, so every `write` emits a + // dictionary (replacement) message followed by the record batch. This + // exercises the dictionary-message encoding path. + group.bench_function("StreamWriter/write_10/dict", |b| { + let batches = create_dict_batches(10, 8192, 2048); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + b.iter(move || { + buffer.clear(); + let mut writer = StreamWriter::try_new(&mut buffer, schema.as_ref()).unwrap(); + for batch in &batches { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + }) + }); +} + +/// Build `n` record batches, each with a single `Dictionary(Int32, Utf8)` +/// column. Each batch uses a distinct set of dictionary values so that the +/// writer must emit a fresh dictionary message for every batch. +fn create_dict_batches(n: usize, num_rows: usize, dict_len: usize) -> Vec { + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + )])); + (0..n) + .map(|batch_idx| { + // Distinct values per batch force a new dictionary message each time. + let values: Vec = (0..dict_len) + .map(|v| format!("batch {batch_idx} dictionary value number {v}")) + .collect(); + let keys = (0..num_rows).map(|i| values[i % dict_len].as_str()); + let dict: DictionaryArray = keys.collect(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap() + }) + .collect() } fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4142858ce80c..8a80efd8ad2d 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -270,6 +270,17 @@ impl Default for IpcWriteOptions { /// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc pub struct IpcDataGenerator {} +/// A dictionary determined (during the dictionary walk) to need sending, captured +/// so it can be serialised to the output afterwards. Holding the `ArrayData` +/// (a cheap clone of `Arc`-backed buffers) lets the walk stay separate from +/// serialisation, so the latter can reuse a single [`FlatBufferBuilder`] and +/// stream bodies zero-copy. +struct DictionaryToEncode { + dict_id: i64, + data: ArrayData, + is_delta: bool, +} + impl IpcDataGenerator { /// Converts a schema to an IPC message along with `dictionary_tracker` /// and returns it encoded inside [EncodedData] as a flatbuffer. @@ -306,7 +317,7 @@ impl IpcDataGenerator { fn _encode_dictionaries>( &self, column: &ArrayRef, - encoded_dictionaries: &mut Vec, + encoded_dictionaries: &mut Vec, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, @@ -467,7 +478,7 @@ impl IpcDataGenerator { &self, field: &Field, column: &ArrayRef, - encoded_dictionaries: &mut Vec, + encoded_dictionaries: &mut Vec, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, @@ -508,22 +519,18 @@ impl IpcDataGenerator { )? { DictionaryUpdate::None => {} DictionaryUpdate::New | DictionaryUpdate::Replaced => { - encoded_dictionaries.push(self.dictionary_batch_to_bytes( + encoded_dictionaries.push(DictionaryToEncode { dict_id, - dict_values, - write_options, - false, - compression_context, - )?); + data: dict_values.clone(), + is_delta: false, + }); } DictionaryUpdate::Delta(data) => { - encoded_dictionaries.push(self.dictionary_batch_to_bytes( + encoded_dictionaries.push(DictionaryToEncode { dict_id, - &data, - write_options, - true, - compression_context, - )?); + data, + is_delta: true, + }); } } } @@ -550,24 +557,36 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { - let encoded_dictionaries = self.encode_all_dicts( + let dictionaries = self.encode_all_dicts( batch, dictionary_tracker, write_options, compression_context, )?; + let mut encoded_dictionaries = Vec::with_capacity(dictionaries.len()); + for dict in dictionaries { + encoded_dictionaries.push(self.dictionary_batch_to_bytes( + dict.dict_id, + &dict.data, + write_options, + dict.is_delta, + compression_context, + )?); + } + let mut fbb = FlatBufferBuilder::new(); let mut arrow_data = Vec::new(); - let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( + let (_, tail_pad) = self.record_batch_to_bytes( batch, write_options, compression_context, &mut IpcBodySink::Write(&mut arrow_data), + &mut fbb, )?; arrow_data.extend_from_slice(&PADDING[..tail_pad]); Ok(( encoded_dictionaries, EncodedData { - ipc_message, + ipc_message: fbb.finished_data().to_vec(), arrow_data, }, )) @@ -580,7 +599,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, - ) -> Result, ArrowError> { + ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); @@ -608,56 +627,65 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: &mut W, + fbb: &mut FlatBufferBuilder<'static>, ) -> Result { - let encoded_dictionaries = self.encode_all_dicts( + let dictionaries = self.encode_all_dicts( batch, dictionary_tracker, write_options, compression_context, )?; - let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len()); - for dict in encoded_dictionaries { - dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); - } - + // Scratch reused for every message's body buffers; dictionaries and the + // record batch all stream their (zero-copy) buffers through it. let capacity = batch .columns() .iter() .map(|a| estimate_encoded_buffer_count(a.data_type())) .sum(); let mut encoded_buffers: Vec = Vec::with_capacity(capacity); - let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes( + + let mut dictionary_block_sizes = Vec::with_capacity(dictionaries.len()); + for dict in &dictionaries { + encoded_buffers.clear(); + let (body_len, tail_pad) = self.dictionary_batch_to_sink( + dict.dict_id, + &dict.data, + write_options, + dict.is_delta, + compression_context, + &mut IpcBodySink::Collect(&mut encoded_buffers), + fbb, + )?; + let header_len = write_encoded_message_direct( + &mut *writer, + fbb.finished_data(), + &encoded_buffers, + tail_pad, + write_options, + )?; + dictionary_block_sizes.push((header_len, body_len)); + } + + encoded_buffers.clear(); + let (body_len, tail_pad) = self.record_batch_to_bytes( batch, write_options, compression_context, &mut IpcBodySink::Collect(&mut encoded_buffers), + fbb, )?; - - let alignment = write_options.alignment; - let a = usize::from(alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; - let aligned_size = (ipc_message.len() + prefix_size + a) & !a; - write_continuation( + let padded_header_len = write_encoded_message_direct( &mut *writer, + fbb.finished_data(), + &encoded_buffers, + tail_pad, write_options, - (aligned_size - prefix_size) as i32, )?; - writer.write_all(&ipc_message)?; - writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; - for enc in &encoded_buffers { - writer.write_all(enc.as_slice())?; - writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; - } - writer.write_all(&PADDING[..tail_pad])?; Ok(IpcWriteMetadata { dictionary_block_sizes, - padded_header_len: aligned_size, + padded_header_len, body_len, }) } @@ -691,13 +719,16 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, - ) -> Result<(Vec, usize, usize), ArrowError> { - let mut fbb = FlatBufferBuilder::new(); + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result<(usize, usize), ArrowError> { + // Reuse the builder's internal buffer across messages; `reset` keeps the + // allocated capacity and only clears the in-progress state. + fbb.reset(); let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); + let mut c = crate::BodyCompressionBuilder::new(&mut *fbb); c.add_method(crate::BodyCompressionMethod::BUFFER); c.add_codec(batch_compression_type); c.finish() @@ -737,7 +768,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(&mut *fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -750,7 +781,7 @@ impl IpcDataGenerator { batch_builder.finish().as_union_value() }; // create an crate::Message - let mut message = crate::MessageBuilder::new(&mut fbb); + let mut message = crate::MessageBuilder::new(&mut *fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); @@ -758,28 +789,38 @@ impl IpcDataGenerator { let root = message.finish(); fbb.finish(root, None); - Ok((fbb.finished_data().to_vec(), body_len, tail_pad)) + // The finished metadata lives in `fbb`'s internal buffer; callers read it + // via `fbb.finished_data()` to avoid an intermediate copy. + Ok((body_len, tail_pad)) } - /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the - /// other for the data - fn dictionary_batch_to_bytes( + /// Encodes a dictionary batch's flatbuffer metadata into `fbb` (read back via + /// `fbb.finished_data()`) and fills `sink` with its body buffers. + /// + /// Returns `(body_len, tail_pad)`: the total body length including trailing + /// padding, and the trailing alignment padding byte count. + fn dictionary_batch_to_sink( &self, dict_id: i64, array_data: &ArrayData, write_options: &IpcWriteOptions, is_delta: bool, compression_context: &mut CompressionContext, - ) -> Result { - let mut fbb = FlatBufferBuilder::new(); + sink: &mut IpcBodySink<'_>, + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result<(usize, usize), ArrowError> { + // Reuse the builder's internal buffer across messages; `reset` keeps the + // allocated capacity and only clears the in-progress state. + fbb.reset(); - let mut arrow_data: Vec = vec![]; + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; // get the type of compression let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); + let mut c = crate::BodyCompressionBuilder::new(&mut *fbb); c.add_method(crate::BodyCompressionMethod::BUFFER); c.add_codec(batch_compression_type); c.finish() @@ -790,12 +831,11 @@ impl IpcDataGenerator { .transpose()?; let alignment = write_options.alignment; - let mut meta = IpcMetadataBuilder::default(); - let mut sink = IpcBodySink::Write(&mut arrow_data); let offset = write_array_data( array_data, - &mut meta, - &mut sink, + &mut buffers, + sink, + &mut nodes, 0, compression_codec, compression_context, @@ -805,10 +845,8 @@ impl IpcDataGenerator { let mut variadic_buffer_counts = vec![]; append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); - // pad the tail of body data let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); // write data let buffers = fbb.create_vector(&meta.buffers); @@ -820,7 +858,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(&mut *fbb); batch_builder.add_length(array_data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -834,7 +872,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut *fbb); batch_builder.add_id(dict_id); batch_builder.add_data(root); batch_builder.add_isDelta(is_delta); @@ -842,7 +880,7 @@ impl IpcDataGenerator { }; let root = { - let mut message_builder = crate::MessageBuilder::new(&mut fbb); + let mut message_builder = crate::MessageBuilder::new(&mut *fbb); message_builder.add_version(write_options.metadata_version); message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); message_builder.add_bodyLength(body_len as i64); @@ -851,10 +889,35 @@ impl IpcDataGenerator { }; fbb.finish(root, None); - let finished_data = fbb.finished_data(); + + Ok((body_len, tail_pad)) + } + + /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the + /// other for the data + fn dictionary_batch_to_bytes( + &self, + dict_id: i64, + array_data: &ArrayData, + write_options: &IpcWriteOptions, + is_delta: bool, + compression_context: &mut CompressionContext, + ) -> Result { + let mut fbb = FlatBufferBuilder::new(); + let mut arrow_data: Vec = vec![]; + let (_, tail_pad) = self.dictionary_batch_to_sink( + dict_id, + array_data, + write_options, + is_delta, + compression_context, + &mut IpcBodySink::Write(&mut arrow_data), + &mut fbb, + )?; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); Ok(EncodedData { - ipc_message: finished_data.to_vec(), + ipc_message: fbb.finished_data().to_vec(), arrow_data, }) } @@ -1239,6 +1302,10 @@ pub struct FileWriter { data_gen: IpcDataGenerator, compression_context: CompressionContext, + + /// Reusable flatbuffer builder, shared across all messages to avoid + /// reallocating its internal buffer on every batch. + fbb: FlatBufferBuilder<'static>, } impl FileWriter> { @@ -1301,6 +1368,7 @@ impl FileWriter { custom_metadata: HashMap::new(), data_gen, compression_context: CompressionContext::default(), + fbb: FlatBufferBuilder::new(), }) } @@ -1323,6 +1391,7 @@ impl FileWriter { &self.write_options, &mut self.compression_context, &mut self.writer, + &mut self.fbb, )?; for (header_len, body_len) in meta.dictionary_block_sizes { @@ -1530,6 +1599,10 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, compression_context: CompressionContext, + + /// Reusable flatbuffer builder, shared across all messages to avoid + /// reallocating its internal buffer on every batch. + fbb: FlatBufferBuilder<'static>, } impl StreamWriter> { @@ -1581,6 +1654,7 @@ impl StreamWriter { dictionary_tracker, data_gen, compression_context: CompressionContext::default(), + fbb: FlatBufferBuilder::new(), }) } @@ -1598,6 +1672,7 @@ impl StreamWriter { &self.write_options, &mut self.compression_context, &mut self.writer, + &mut self.fbb, )?; Ok(()) } @@ -1701,6 +1776,44 @@ pub struct EncodedData { /// Arrow buffers to be written, should be an empty vec for schema messages pub arrow_data: Vec, } +/// Write a single message directly to `writer`: the continuation prefix, the +/// flatbuffer metadata (`ipc_message`) with alignment padding, then each body +/// buffer with its per-buffer padding, then the trailing body padding. +/// +/// `ipc_message` is the raw flatbuffer header (without continuation prefix), and +/// `encoded_buffers` are the already-encoded body buffers (zero-copy or +/// compressed). Returns the padded header length (continuation + metadata + +/// padding), needed for IPC footer blocks. +fn write_encoded_message_direct( + writer: &mut W, + ipc_message: &[u8], + encoded_buffers: &[EncodedBuffer], + tail_pad: usize, + write_options: &IpcWriteOptions, +) -> Result { + let alignment = write_options.alignment; + let a = usize::from(alignment - 1); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (ipc_message.len() + prefix_size + a) & !a; + write_continuation( + &mut *writer, + write_options, + (aligned_size - prefix_size) as i32, + )?; + writer.write_all(ipc_message)?; + writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; + for enc in encoded_buffers { + writer.write_all(enc.as_slice())?; + writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; + } + writer.write_all(&PADDING[..tail_pad])?; + Ok(aligned_size) +} + /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message( mut writer: W, From 57b07ab027cee816f6c4cd0222ff06dfd650baa5 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 16:13:35 -0700 Subject: [PATCH 02/19] Remove compression wiring --- arrow-ipc/src/writer.rs | 37 +++++-------------------------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 8a80efd8ad2d..f79d18bf6307 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -321,7 +321,6 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, - compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { @@ -334,7 +333,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } } @@ -356,7 +354,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::List(field) => { @@ -368,7 +365,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::LargeList(field) => { @@ -380,7 +376,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::ListView(field) => { @@ -392,7 +387,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::LargeListView(field) => { @@ -404,7 +398,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::FixedSizeList(field, _) => { @@ -419,7 +412,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::Map(field, _) => { @@ -438,7 +430,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; // values @@ -449,7 +440,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::Union(fields, _) => { @@ -463,7 +453,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, - compression_context, )?; } } @@ -482,7 +471,6 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, - compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -497,7 +485,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, - compression_context, )?; // It's important to only take the dict_id at this point, because the dict ID @@ -540,7 +527,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, - compression_context, )?, } @@ -557,12 +543,7 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { - let dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; + let dictionaries = self.collect_all_dicts(batch, dictionary_tracker, write_options)?; let mut encoded_dictionaries = Vec::with_capacity(dictionaries.len()); for dict in dictionaries { encoded_dictionaries.push(self.dictionary_batch_to_bytes( @@ -592,13 +573,12 @@ impl IpcDataGenerator { )) } - /// Encode dictionary batches for all columns in `batch`. - fn encode_all_dicts( + /// Walk the record batch and collect dictionaries for later encoding. + fn collect_all_dicts( &self, batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); @@ -611,7 +591,6 @@ impl IpcDataGenerator { dictionary_tracker, write_options, &mut dict_id, - compression_context, )?; } Ok(encoded_dictionaries) @@ -629,15 +608,8 @@ impl IpcDataGenerator { writer: &mut W, fbb: &mut FlatBufferBuilder<'static>, ) -> Result { - let dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; + let dictionaries = self.collect_all_dicts(batch, dictionary_tracker, write_options)?; - // Scratch reused for every message's body buffers; dictionaries and the - // record batch all stream their (zero-copy) buffers through it. let capacity = batch .columns() .iter() @@ -799,6 +771,7 @@ impl IpcDataGenerator { /// /// Returns `(body_len, tail_pad)`: the total body length including trailing /// padding, and the trailing alignment padding byte count. + #[allow(clippy::too_many_arguments)] fn dictionary_batch_to_sink( &self, dict_id: i64, From d24dcd8364bc129547d65c120fd87cd6a1e12077 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 16:22:17 -0700 Subject: [PATCH 03/19] Update function names --- arrow-ipc/src/writer.rs | 49 ++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index f79d18bf6307..25080ebfe6d2 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -314,7 +314,7 @@ impl IpcDataGenerator { } } - fn _encode_dictionaries>( + fn _collect_dict_updates>( &self, column: &ArrayRef, encoded_dictionaries: &mut Vec, @@ -326,7 +326,7 @@ impl IpcDataGenerator { DataType::Struct(fields) => { let s = as_struct_array(column); for (field, column) in fields.iter().zip(s.columns()) { - self.encode_dictionaries( + self.collect_dict_updates( field, column, encoded_dictionaries, @@ -347,7 +347,7 @@ impl IpcDataGenerator { // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries // only for values array. let values_array = make_array(data.child_data()[1].clone()); - self.encode_dictionaries( + self.collect_dict_updates( values, &values_array, encoded_dictionaries, @@ -358,7 +358,7 @@ impl IpcDataGenerator { } DataType::List(field) => { let list = as_list_array(column); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, @@ -369,7 +369,7 @@ impl IpcDataGenerator { } DataType::LargeList(field) => { let list = as_large_list_array(column); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, @@ -380,7 +380,7 @@ impl IpcDataGenerator { } DataType::ListView(field) => { let list = column.as_list_view::(); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, @@ -391,7 +391,7 @@ impl IpcDataGenerator { } DataType::LargeListView(field) => { let list = column.as_list_view::(); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, @@ -405,7 +405,7 @@ impl IpcDataGenerator { .as_any() .downcast_ref::() .expect("Unable to downcast to fixed size list array"); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, @@ -423,7 +423,7 @@ impl IpcDataGenerator { }; // keys - self.encode_dictionaries( + self.collect_dict_updates( keys, map_array.keys(), encoded_dictionaries, @@ -433,7 +433,7 @@ impl IpcDataGenerator { )?; // values - self.encode_dictionaries( + self.collect_dict_updates( values, map_array.values(), encoded_dictionaries, @@ -446,7 +446,7 @@ impl IpcDataGenerator { let union = as_union_array(column); for (type_id, field) in fields.iter() { let column = union.child(type_id); - self.encode_dictionaries( + self.collect_dict_updates( field, column, encoded_dictionaries, @@ -462,12 +462,15 @@ impl IpcDataGenerator { Ok(()) } + // Collect all dicts that need a dictionary message i.e. ones that were + // either not in the tracker previously or ones that were but need a + // replacement or delta. #[allow(clippy::too_many_arguments)] - fn encode_dictionaries>( + fn collect_dict_updates>( &self, field: &Field, column: &ArrayRef, - encoded_dictionaries: &mut Vec, + dictionaries: &mut Vec, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, @@ -479,9 +482,9 @@ impl IpcDataGenerator { let values = make_array(dict_data.child_data()[0].clone()); - self._encode_dictionaries( + self._collect_dict_updates( &values, - encoded_dictionaries, + dictionaries, dictionary_tracker, write_options, dict_id_seq, @@ -506,14 +509,14 @@ impl IpcDataGenerator { )? { DictionaryUpdate::None => {} DictionaryUpdate::New | DictionaryUpdate::Replaced => { - encoded_dictionaries.push(DictionaryToEncode { + dictionaries.push(DictionaryToEncode { dict_id, data: dict_values.clone(), is_delta: false, }); } DictionaryUpdate::Delta(data) => { - encoded_dictionaries.push(DictionaryToEncode { + dictionaries.push(DictionaryToEncode { dict_id, data, is_delta: true, @@ -521,9 +524,9 @@ impl IpcDataGenerator { } } } - _ => self._encode_dictionaries( + _ => self._collect_dict_updates( column, - encoded_dictionaries, + dictionaries, dictionary_tracker, write_options, dict_id_seq, @@ -581,19 +584,19 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, ) -> Result, ArrowError> { let schema = batch.schema(); - let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); + let mut dictionaries = Vec::with_capacity(schema.flattened_fields().len()); let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); for (i, field) in schema.fields().iter().enumerate() { - self.encode_dictionaries( + self.collect_dict_updates( field, batch.column(i), - &mut encoded_dictionaries, + &mut dictionaries, dictionary_tracker, write_options, &mut dict_id, )?; } - Ok(encoded_dictionaries) + Ok(dictionaries) } /// Write dictionary batches and the record batch directly to `writer`, skipping the From 32d28ce58bb3dddbbe2a0a59fce2ad08ff29bcae Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 16:45:07 -0700 Subject: [PATCH 04/19] clean up arguments --- arrow-ipc/src/writer.rs | 41 ++++++++++++++++------------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 25080ebfe6d2..75e20c3f45bf 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -276,7 +276,7 @@ pub struct IpcDataGenerator {} /// serialisation, so the latter can reuse a single [`FlatBufferBuilder`] and /// stream bodies zero-copy. struct DictionaryToEncode { - dict_id: i64, + id: i64, data: ArrayData, is_delta: bool, } @@ -510,14 +510,14 @@ impl IpcDataGenerator { DictionaryUpdate::None => {} DictionaryUpdate::New | DictionaryUpdate::Replaced => { dictionaries.push(DictionaryToEncode { - dict_id, + id: dict_id, data: dict_values.clone(), is_delta: false, }); } DictionaryUpdate::Delta(data) => { dictionaries.push(DictionaryToEncode { - dict_id, + id: dict_id, data, is_delta: true, }); @@ -550,10 +550,8 @@ impl IpcDataGenerator { let mut encoded_dictionaries = Vec::with_capacity(dictionaries.len()); for dict in dictionaries { encoded_dictionaries.push(self.dictionary_batch_to_bytes( - dict.dict_id, - &dict.data, + dict, write_options, - dict.is_delta, compression_context, )?); } @@ -612,6 +610,7 @@ impl IpcDataGenerator { fbb: &mut FlatBufferBuilder<'static>, ) -> Result { let dictionaries = self.collect_all_dicts(batch, dictionary_tracker, write_options)?; + let mut dictionary_block_sizes = Vec::with_capacity(dictionaries.len()); let capacity = batch .columns() @@ -620,14 +619,11 @@ impl IpcDataGenerator { .sum(); let mut encoded_buffers: Vec = Vec::with_capacity(capacity); - let mut dictionary_block_sizes = Vec::with_capacity(dictionaries.len()); for dict in &dictionaries { encoded_buffers.clear(); let (body_len, tail_pad) = self.dictionary_batch_to_sink( - dict.dict_id, - &dict.data, + dict, write_options, - dict.is_delta, compression_context, &mut IpcBodySink::Collect(&mut encoded_buffers), fbb, @@ -774,13 +770,10 @@ impl IpcDataGenerator { /// /// Returns `(body_len, tail_pad)`: the total body length including trailing /// padding, and the trailing alignment padding byte count. - #[allow(clippy::too_many_arguments)] fn dictionary_batch_to_sink( &self, - dict_id: i64, - array_data: &ArrayData, + dict: &DictionaryToEncode, write_options: &IpcWriteOptions, - is_delta: bool, compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, fbb: &mut FlatBufferBuilder<'static>, @@ -808,18 +801,20 @@ impl IpcDataGenerator { let alignment = write_options.alignment; let offset = write_array_data( - array_data, + &dict.data, &mut buffers, sink, &mut nodes, 0, + dict.data.len(), + dict.data.null_count(), compression_codec, compression_context, write_options, )?; let mut variadic_buffer_counts = vec![]; - append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); + append_variadic_buffer_counts(&mut variadic_buffer_counts, &dict.data); let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; @@ -835,7 +830,7 @@ impl IpcDataGenerator { let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut *fbb); - batch_builder.add_length(array_data.len() as i64); + batch_builder.add_length(dict.data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); if let Some(c) = compression { @@ -849,9 +844,9 @@ impl IpcDataGenerator { let root = { let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut *fbb); - batch_builder.add_id(dict_id); + batch_builder.add_id(dict.id); batch_builder.add_data(root); - batch_builder.add_isDelta(is_delta); + batch_builder.add_isDelta(dict.is_delta); batch_builder.finish().as_union_value() }; @@ -873,19 +868,15 @@ impl IpcDataGenerator { /// other for the data fn dictionary_batch_to_bytes( &self, - dict_id: i64, - array_data: &ArrayData, + dict: DictionaryToEncode, write_options: &IpcWriteOptions, - is_delta: bool, compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut arrow_data: Vec = vec![]; let (_, tail_pad) = self.dictionary_batch_to_sink( - dict_id, - array_data, + &dict, write_options, - is_delta, compression_context, &mut IpcBodySink::Write(&mut arrow_data), &mut fbb, From 3924e29746dcaf0c324af7f7f742a59264d184cf Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:01:56 -0700 Subject: [PATCH 05/19] Fix up reborrow --- arrow-ipc/src/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 75e20c3f45bf..c34a62a18b7a 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -789,7 +789,7 @@ impl IpcDataGenerator { let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut *fbb); + let mut c = crate::BodyCompressionBuilder::new(fbb); c.add_method(crate::BodyCompressionMethod::BUFFER); c.add_codec(batch_compression_type); c.finish() From 0a0bfb7c6c1fe1bc69be13ba17f917c603d5faf2 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:11:13 -0700 Subject: [PATCH 06/19] documentation --- arrow-ipc/src/writer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index c34a62a18b7a..3b0d2fdd65c1 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -682,8 +682,12 @@ impl IpcDataGenerator { /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the /// serialised buffer data. /// - /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the - /// total body length including trailing padding, and the trailing alignment padding byte count. + /// Returns `(body_len, tail_pad)`: the total body length including trailing padding, + /// and the trailing alignment padding byte count. + /// + /// The message header is located in the provided FlatBufferBuilder's finished + /// bytes. A successful Result from this function guarantees the fbb is in + /// a finished state to call [`FlatBufferBuilder::finished_data`]. fn record_batch_to_bytes( &self, batch: &RecordBatch, @@ -760,8 +764,6 @@ impl IpcDataGenerator { let root = message.finish(); fbb.finish(root, None); - // The finished metadata lives in `fbb`'s internal buffer; callers read it - // via `fbb.finished_data()` to avoid an intermediate copy. Ok((body_len, tail_pad)) } From fbf8fb5d3c81a32a5595ddb380bfc217f051ca9d Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:18:30 -0700 Subject: [PATCH 07/19] Trim down parameter drilling --- arrow-ipc/src/writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 3b0d2fdd65c1..f9c0083dd3c5 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -808,8 +808,6 @@ impl IpcDataGenerator { sink, &mut nodes, 0, - dict.data.len(), - dict.data.null_count(), compression_codec, compression_context, write_options, @@ -2044,6 +2042,7 @@ fn write_array_data( ) -> Result { let mut offset = offset; let num_rows = array_data.len(); + let null_count = array_data.null_count(); if !matches!(array_data.data_type(), DataType::Null) { meta.nodes.push(crate::FieldNode::new( num_rows as i64, From 87ff7d294f0d46a33da8627ec4bc7b9f1e26fa9b Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:25:53 -0700 Subject: [PATCH 08/19] Move function --- arrow-ipc/src/writer.rs | 50 ++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index f9c0083dd3c5..77ccfff792da 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -767,6 +767,31 @@ impl IpcDataGenerator { Ok((body_len, tail_pad)) } + /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the + /// other for the data + fn dictionary_batch_to_bytes( + &self, + dict: DictionaryToEncode, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + ) -> Result { + let mut fbb = FlatBufferBuilder::new(); + let mut arrow_data: Vec = vec![]; + let (_, tail_pad) = self.dictionary_batch_to_sink( + &dict, + write_options, + compression_context, + &mut IpcBodySink::Write(&mut arrow_data), + &mut fbb, + )?; + arrow_data.extend_from_slice(&PADDING[..tail_pad]); + + Ok(EncodedData { + ipc_message: fbb.finished_data().to_vec(), + arrow_data, + }) + } + /// Encodes a dictionary batch's flatbuffer metadata into `fbb` (read back via /// `fbb.finished_data()`) and fills `sink` with its body buffers. /// @@ -863,31 +888,6 @@ impl IpcDataGenerator { Ok((body_len, tail_pad)) } - - /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the - /// other for the data - fn dictionary_batch_to_bytes( - &self, - dict: DictionaryToEncode, - write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, - ) -> Result { - let mut fbb = FlatBufferBuilder::new(); - let mut arrow_data: Vec = vec![]; - let (_, tail_pad) = self.dictionary_batch_to_sink( - &dict, - write_options, - compression_context, - &mut IpcBodySink::Write(&mut arrow_data), - &mut fbb, - )?; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); - - Ok(EncodedData { - ipc_message: fbb.finished_data().to_vec(), - arrow_data, - }) - } } fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { From b45edebcf26cbcba4718e9c69d3a03465f0660c4 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:31:46 -0700 Subject: [PATCH 09/19] Remove unnecessary re-borrows --- arrow-ipc/src/writer.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 77ccfff792da..d0ad5bfc964e 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -703,7 +703,7 @@ impl IpcDataGenerator { let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut *fbb); + let mut c = crate::BodyCompressionBuilder::new(fbb); c.add_method(crate::BodyCompressionMethod::BUFFER); c.add_codec(batch_compression_type); c.finish() @@ -743,7 +743,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut *fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -756,7 +756,7 @@ impl IpcDataGenerator { batch_builder.finish().as_union_value() }; // create an crate::Message - let mut message = crate::MessageBuilder::new(&mut *fbb); + let mut message = crate::MessageBuilder::new(fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); @@ -854,7 +854,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut *fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); batch_builder.add_length(dict.data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -868,7 +868,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut *fbb); + let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb); batch_builder.add_id(dict.id); batch_builder.add_data(root); batch_builder.add_isDelta(dict.is_delta); @@ -876,7 +876,7 @@ impl IpcDataGenerator { }; let root = { - let mut message_builder = crate::MessageBuilder::new(&mut *fbb); + let mut message_builder = crate::MessageBuilder::new(fbb); message_builder.add_version(write_options.metadata_version); message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); message_builder.add_bodyLength(body_len as i64); From 19c5f34efd8609d5c001e3c1a8876f96f5bb7f52 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:50:41 -0700 Subject: [PATCH 10/19] Unify record batch and dictionary batch encoding --- arrow-ipc/src/writer.rs | 207 +++++++++++++++++++--------------------- 1 file changed, 98 insertions(+), 109 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index d0ad5bfc964e..c65664a0801e 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -700,67 +700,21 @@ impl IpcDataGenerator { // allocated capacity and only clears the in-progress state. fbb.reset(); - let batch_compression_type = write_options.batch_compression_type; - - let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(fbb); - c.add_method(crate::BodyCompressionMethod::BUFFER); - c.add_codec(batch_compression_type); - c.finish() - }); - - let compression_codec: Option = - batch_compression_type.map(TryInto::try_into).transpose()?; - - let alignment = write_options.alignment; - let mut variadic_buffer_counts = vec![]; - let mut meta = IpcMetadataBuilder::default(); - let mut offset = 0i64; - - for array in batch.columns() { - let array_data = array.to_data(); - offset = write_array_data( - &array_data, - &mut meta, - sink, - offset, - compression_codec, - compression_context, - write_options, - )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); - } - - let tail_pad = pad_to_alignment(alignment, offset as usize); - let body_len = offset as usize + tail_pad; - - let buffers = fbb.create_vector(&meta.buffers); - let nodes = fbb.create_vector(&meta.nodes); - let variadic_buffer = if variadic_buffer_counts.is_empty() { - None - } else { - Some(fbb.create_vector(&variadic_buffer_counts)) - }; + let (root, body_len, tail_pad) = self.encode_record_batch_data( + batch.columns().iter().map(|array| array.to_data()), + batch.num_rows() as i64, + write_options, + compression_context, + sink, + fbb, + )?; - let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(fbb); - batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(c) = compression { - batch_builder.add_compression(c); - } - if let Some(v) = variadic_buffer { - batch_builder.add_variadicBufferCounts(v); - } - batch_builder.finish().as_union_value() - }; // create an crate::Message let mut message = crate::MessageBuilder::new(fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); - message.add_header(root); + message.add_header(root.as_union_value()); let root = message.finish(); fbb.finish(root, None); @@ -809,10 +763,63 @@ impl IpcDataGenerator { // allocated capacity and only clears the in-progress state. fbb.reset(); - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; + // A dictionary batch is a record batch (the single column of dictionary + // values) wrapped in a DictionaryBatch, so we share the record batch body + // and table encoding and only differ in the message framing. + let (record_batch, body_len, tail_pad) = self.encode_record_batch_data( + std::iter::once(dict.data.clone()), + dict.data.len() as i64, + write_options, + compression_context, + sink, + fbb, + )?; + + let root = { + let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb); + batch_builder.add_id(dict.id); + batch_builder.add_data(record_batch); + batch_builder.add_isDelta(dict.is_delta); + batch_builder.finish().as_union_value() + }; + + let root = { + let mut message_builder = crate::MessageBuilder::new(fbb); + message_builder.add_version(write_options.metadata_version); + message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); + message_builder.add_bodyLength(body_len as i64); + message_builder.add_header(root); + message_builder.finish() + }; + + fbb.finish(root, None); + + Ok((body_len, tail_pad)) + } - // get the type of compression + /// Encode the body buffers of a single record batch into `sink` and build the + /// flatbuffer `RecordBatch` table into `fbb`. + /// + /// This is the shared core of [`Self::record_batch_to_bytes`] and + /// [`Self::dictionary_batch_to_sink`]: a dictionary message embeds a record + /// batch (its single column of values), so both encode the same `RecordBatch` + /// table and only differ in how they wrap the returned offset in a message. + /// + /// `columns` yields the column data in IPC buffer order and `row_count` is the + /// logical length recorded in the `RecordBatch` header. Returns + /// `(record_batch, body_len, tail_pad)`: the in-progress flatbuffer offset of the + /// `RecordBatch` table, the total body length including trailing padding, and the + /// trailing alignment padding byte count. The caller is responsible for wrapping + /// `record_batch` in a `Message` and calling [`FlatBufferBuilder::finish`]. + fn encode_record_batch_data( + &self, + columns: impl IntoIterator, + row_count: i64, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + sink: &mut IpcBodySink<'_>, + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result<(flatbuffers::WIPOffset>, usize, usize), ArrowError> { let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -822,71 +829,53 @@ impl IpcDataGenerator { c.finish() }); - let compression_codec: Option = batch_compression_type - .map(|batch_compression_type| batch_compression_type.try_into()) - .transpose()?; + let compression_codec: Option = + batch_compression_type.map(TryInto::try_into).transpose()?; let alignment = write_options.alignment; - let offset = write_array_data( - &dict.data, - &mut buffers, - sink, - &mut nodes, - 0, - compression_codec, - compression_context, - write_options, - )?; + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut variadic_buffer_counts: Vec = vec![]; + let mut offset = 0i64; - let mut variadic_buffer_counts = vec![]; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &dict.data); + for array_data in columns { + offset = write_array_data( + &array_data, + &mut buffers, + sink, + &mut nodes, + offset, + compression_codec, + compression_context, + write_options, + )?; + append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); + } let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; - // write data - let buffers = fbb.create_vector(&meta.buffers); - let nodes = fbb.create_vector(&meta.nodes); + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { Some(fbb.create_vector(&variadic_buffer_counts)) }; - let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(fbb); - batch_builder.add_length(dict.data.len() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(c) = compression { - batch_builder.add_compression(c); - } - if let Some(v) = variadic_buffer { - batch_builder.add_variadicBufferCounts(v); - } - batch_builder.finish() - }; - - let root = { - let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb); - batch_builder.add_id(dict.id); - batch_builder.add_data(root); - batch_builder.add_isDelta(dict.is_delta); - batch_builder.finish().as_union_value() - }; - - let root = { - let mut message_builder = crate::MessageBuilder::new(fbb); - message_builder.add_version(write_options.metadata_version); - message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(body_len as i64); - message_builder.add_header(root); - message_builder.finish() - }; - - fbb.finish(root, None); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); + batch_builder.add_length(row_count); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } + let root = batch_builder.finish(); - Ok((body_len, tail_pad)) + Ok((root, body_len, tail_pad)) } } From 8dcf8438eecd2ec24f6dac71a2c17e6047889008 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 17:59:20 -0700 Subject: [PATCH 11/19] Unify write messages --- arrow-ipc/src/writer.rs | 111 +++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index c65664a0801e..362cbcb08ea6 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -281,6 +281,21 @@ struct DictionaryToEncode { is_delta: bool, } +/// The result of encoding a record batch's body and flatbuffer `RecordBatch` table +/// via [`IpcDataGenerator::encode_record_batch_data`]. +/// +/// The `RecordBatch` table is left in-progress in the caller's [`FlatBufferBuilder`]; +/// the caller wraps `record_batch` in a `Message` (directly for a record batch +/// message, or inside a `DictionaryBatch` for a dictionary message). +struct EncodedRecordBatch { + /// In-progress flatbuffer offset of the `RecordBatch` table. + record_batch: flatbuffers::WIPOffset>, + /// Total body length written to the sink, including trailing alignment padding. + body_len: usize, + /// Trailing alignment padding byte count (already included in `body_len`). + tail_pad: usize, +} + impl IpcDataGenerator { /// Converts a schema to an IPC message along with `dictionary_tracker` /// and returns it encoded inside [EncodedData] as a flatbuffer. @@ -700,7 +715,11 @@ impl IpcDataGenerator { // allocated capacity and only clears the in-progress state. fbb.reset(); - let (root, body_len, tail_pad) = self.encode_record_batch_data( + let EncodedRecordBatch { + record_batch, + body_len, + tail_pad, + } = self.encode_record_batch_data( batch.columns().iter().map(|array| array.to_data()), batch.num_rows() as i64, write_options, @@ -714,7 +733,7 @@ impl IpcDataGenerator { message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); - message.add_header(root.as_union_value()); + message.add_header(record_batch.as_union_value()); let root = message.finish(); fbb.finish(root, None); @@ -766,7 +785,11 @@ impl IpcDataGenerator { // A dictionary batch is a record batch (the single column of dictionary // values) wrapped in a DictionaryBatch, so we share the record batch body // and table encoding and only differ in the message framing. - let (record_batch, body_len, tail_pad) = self.encode_record_batch_data( + let EncodedRecordBatch { + record_batch, + body_len, + tail_pad, + } = self.encode_record_batch_data( std::iter::once(dict.data.clone()), dict.data.len() as i64, write_options, @@ -806,11 +829,9 @@ impl IpcDataGenerator { /// table and only differ in how they wrap the returned offset in a message. /// /// `columns` yields the column data in IPC buffer order and `row_count` is the - /// logical length recorded in the `RecordBatch` header. Returns - /// `(record_batch, body_len, tail_pad)`: the in-progress flatbuffer offset of the - /// `RecordBatch` table, the total body length including trailing padding, and the - /// trailing alignment padding byte count. The caller is responsible for wrapping - /// `record_batch` in a `Message` and calling [`FlatBufferBuilder::finish`]. + /// logical length recorded in the `RecordBatch` header. The caller is responsible + /// for wrapping the returned [`EncodedRecordBatch::record_batch`] in a `Message` + /// and calling [`FlatBufferBuilder::finish`]. fn encode_record_batch_data( &self, columns: impl IntoIterator, @@ -819,7 +840,7 @@ impl IpcDataGenerator { compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, fbb: &mut FlatBufferBuilder<'static>, - ) -> Result<(flatbuffers::WIPOffset>, usize, usize), ArrowError> { + ) -> Result { let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -873,9 +894,13 @@ impl IpcDataGenerator { if let Some(v) = variadic_buffer { batch_builder.add_variadicBufferCounts(v); } - let root = batch_builder.finish(); + let record_batch = batch_builder.finish(); - Ok((root, body_len, tail_pad)) + Ok(EncodedRecordBatch { + record_batch, + body_len, + tail_pad, + }) } } @@ -1732,9 +1757,9 @@ pub struct EncodedData { /// Arrow buffers to be written, should be an empty vec for schema messages pub arrow_data: Vec, } -/// Write a single message directly to `writer`: the continuation prefix, the -/// flatbuffer metadata (`ipc_message`) with alignment padding, then each body -/// buffer with its per-buffer padding, then the trailing body padding. +/// Write a single message directly to `writer`: the message header (continuation +/// prefix + flatbuffer metadata + alignment padding), then each body buffer with its +/// per-buffer padding, then the trailing body padding. /// /// `ipc_message` is the raw flatbuffer header (without continuation prefix), and /// `encoded_buffers` are the already-encoded body buffers (zero-copy or @@ -1747,21 +1772,9 @@ fn write_encoded_message_direct( tail_pad: usize, write_options: &IpcWriteOptions, ) -> Result { + let aligned_size = write_message_header(writer, ipc_message, write_options)?; + let alignment = write_options.alignment; - let a = usize::from(alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; - let aligned_size = (ipc_message.len() + prefix_size + a) & !a; - write_continuation( - &mut *writer, - write_options, - (aligned_size - prefix_size) as i32, - )?; - writer.write_all(ipc_message)?; - writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; for enc in encoded_buffers { writer.write_all(enc.as_slice())?; writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; @@ -1783,38 +1796,52 @@ pub fn write_message( )); } + let aligned_size = write_message_header(&mut writer, &encoded.ipc_message, write_options)?; + + // write arrow data + let body_len = if arrow_data_len > 0 { + write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? + } else { + 0 + }; + + Ok((aligned_size, body_len)) +} + +/// Write the encapsulated-message header to `writer`: the continuation marker and +/// metadata length, the flatbuffer metadata (`ipc_message`), and the alignment +/// padding that follows it. +/// +/// Returns the padded header length (continuation prefix + metadata + padding), +/// which is also the value encoded in the continuation length field. +fn write_message_header( + writer: &mut W, + ipc_message: &[u8], + write_options: &IpcWriteOptions, +) -> Result { let a = usize::from(write_options.alignment - 1); - let buffer = encoded.ipc_message; - let flatbuf_size = buffer.len(); + let flatbuf_size = ipc_message.len(); let prefix_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; write_continuation( - &mut writer, + &mut *writer, write_options, (aligned_size - prefix_size) as i32, )?; // write the flatbuf if flatbuf_size > 0 { - writer.write_all(&buffer)?; + writer.write_all(ipc_message)?; } // write padding - writer.write_all(&PADDING[..padding_bytes])?; - - // write arrow data - let body_len = if arrow_data_len > 0 { - write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? - } else { - 0 - }; + writer.write_all(&PADDING[..aligned_size - flatbuf_size - prefix_size])?; - Ok((aligned_size, body_len)) + Ok(aligned_size) } fn write_body_buffers( From a4dbf810c86b7f156b72ae19566ba4c5609be1c9 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 18:19:05 -0700 Subject: [PATCH 12/19] c --- arrow-ipc/src/writer.rs | 56 ++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 362cbcb08ea6..19054248050c 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -290,10 +290,10 @@ struct DictionaryToEncode { struct EncodedRecordBatch { /// In-progress flatbuffer offset of the `RecordBatch` table. record_batch: flatbuffers::WIPOffset>, - /// Total body length written to the sink, including trailing alignment padding. + /// Total body length written to the sink (the sum of each buffer's encoded + /// length and its alignment padding). Each buffer is individually padded to the + /// alignment, so the body is already aligned and needs no trailing padding. body_len: usize, - /// Trailing alignment padding byte count (already included in `body_len`). - tail_pad: usize, } impl IpcDataGenerator { @@ -572,14 +572,13 @@ impl IpcDataGenerator { } let mut fbb = FlatBufferBuilder::new(); let mut arrow_data = Vec::new(); - let (_, tail_pad) = self.record_batch_to_bytes( + self.record_batch_to_bytes( batch, write_options, compression_context, &mut IpcBodySink::Write(&mut arrow_data), &mut fbb, )?; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); Ok(( encoded_dictionaries, EncodedData { @@ -636,7 +635,7 @@ impl IpcDataGenerator { for dict in &dictionaries { encoded_buffers.clear(); - let (body_len, tail_pad) = self.dictionary_batch_to_sink( + let body_len = self.dictionary_batch_to_sink( dict, write_options, compression_context, @@ -647,14 +646,13 @@ impl IpcDataGenerator { &mut *writer, fbb.finished_data(), &encoded_buffers, - tail_pad, write_options, )?; dictionary_block_sizes.push((header_len, body_len)); } encoded_buffers.clear(); - let (body_len, tail_pad) = self.record_batch_to_bytes( + let body_len = self.record_batch_to_bytes( batch, write_options, compression_context, @@ -665,7 +663,6 @@ impl IpcDataGenerator { &mut *writer, fbb.finished_data(), &encoded_buffers, - tail_pad, write_options, )?; @@ -697,8 +694,8 @@ impl IpcDataGenerator { /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the /// serialised buffer data. /// - /// Returns `(body_len, tail_pad)`: the total body length including trailing padding, - /// and the trailing alignment padding byte count. + /// Returns the total body length written to `sink` (including per-buffer alignment + /// padding). /// /// The message header is located in the provided FlatBufferBuilder's finished /// bytes. A successful Result from this function guarantees the fbb is in @@ -710,7 +707,7 @@ impl IpcDataGenerator { compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, fbb: &mut FlatBufferBuilder<'static>, - ) -> Result<(usize, usize), ArrowError> { + ) -> Result { // Reuse the builder's internal buffer across messages; `reset` keeps the // allocated capacity and only clears the in-progress state. fbb.reset(); @@ -718,7 +715,6 @@ impl IpcDataGenerator { let EncodedRecordBatch { record_batch, body_len, - tail_pad, } = self.encode_record_batch_data( batch.columns().iter().map(|array| array.to_data()), batch.num_rows() as i64, @@ -737,7 +733,7 @@ impl IpcDataGenerator { let root = message.finish(); fbb.finish(root, None); - Ok((body_len, tail_pad)) + Ok(body_len) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -750,14 +746,13 @@ impl IpcDataGenerator { ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut arrow_data: Vec = vec![]; - let (_, tail_pad) = self.dictionary_batch_to_sink( + self.dictionary_batch_to_sink( &dict, write_options, compression_context, &mut IpcBodySink::Write(&mut arrow_data), &mut fbb, )?; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); Ok(EncodedData { ipc_message: fbb.finished_data().to_vec(), @@ -768,8 +763,8 @@ impl IpcDataGenerator { /// Encodes a dictionary batch's flatbuffer metadata into `fbb` (read back via /// `fbb.finished_data()`) and fills `sink` with its body buffers. /// - /// Returns `(body_len, tail_pad)`: the total body length including trailing - /// padding, and the trailing alignment padding byte count. + /// Returns the total body length written to `sink` (including per-buffer alignment + /// padding). fn dictionary_batch_to_sink( &self, dict: &DictionaryToEncode, @@ -777,7 +772,7 @@ impl IpcDataGenerator { compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, fbb: &mut FlatBufferBuilder<'static>, - ) -> Result<(usize, usize), ArrowError> { + ) -> Result { // Reuse the builder's internal buffer across messages; `reset` keeps the // allocated capacity and only clears the in-progress state. fbb.reset(); @@ -788,7 +783,6 @@ impl IpcDataGenerator { let EncodedRecordBatch { record_batch, body_len, - tail_pad, } = self.encode_record_batch_data( std::iter::once(dict.data.clone()), dict.data.len() as i64, @@ -817,7 +811,7 @@ impl IpcDataGenerator { fbb.finish(root, None); - Ok((body_len, tail_pad)) + Ok(body_len) } /// Encode the body buffers of a single record batch into `sink` and build the @@ -853,7 +847,6 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; - let alignment = write_options.alignment; let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; let mut variadic_buffer_counts: Vec = vec![]; @@ -873,8 +866,9 @@ impl IpcDataGenerator { append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } - let tail_pad = pad_to_alignment(alignment, offset as usize); - let body_len = offset as usize + tail_pad; + // Each buffer is padded to the alignment as it is written, so `offset` is + // already a multiple of the alignment -- the body needs no trailing padding. + let body_len = offset as usize; let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); @@ -899,7 +893,6 @@ impl IpcDataGenerator { Ok(EncodedRecordBatch { record_batch, body_len, - tail_pad, }) } } @@ -1759,17 +1752,17 @@ pub struct EncodedData { } /// Write a single message directly to `writer`: the message header (continuation /// prefix + flatbuffer metadata + alignment padding), then each body buffer with its -/// per-buffer padding, then the trailing body padding. +/// per-buffer alignment padding. /// /// `ipc_message` is the raw flatbuffer header (without continuation prefix), and /// `encoded_buffers` are the already-encoded body buffers (zero-copy or -/// compressed). Returns the padded header length (continuation + metadata + -/// padding), needed for IPC footer blocks. +/// compressed). Each buffer is padded to the alignment, so the body needs no trailing +/// padding. Returns the padded header length (continuation + metadata + padding), +/// needed for IPC footer blocks. fn write_encoded_message_direct( writer: &mut W, ipc_message: &[u8], encoded_buffers: &[EncodedBuffer], - tail_pad: usize, write_options: &IpcWriteOptions, ) -> Result { let aligned_size = write_message_header(writer, ipc_message, write_options)?; @@ -1779,7 +1772,6 @@ fn write_encoded_message_direct( writer.write_all(enc.as_slice())?; writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; } - writer.write_all(&PADDING[..tail_pad])?; Ok(aligned_size) } @@ -1800,7 +1792,7 @@ pub fn write_message( // write arrow data let body_len = if arrow_data_len > 0 { - write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? + write_body_buffer(&mut writer, &encoded.arrow_data, write_options.alignment)? } else { 0 }; @@ -1844,7 +1836,7 @@ fn write_message_header( Ok(aligned_size) } -fn write_body_buffers( +fn write_body_buffer( mut writer: W, data: &[u8], alignment: u8, From eb82e9ace0f665dc29fdb86dfbac758f30c38232 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Wed, 10 Jun 2026 18:29:06 -0700 Subject: [PATCH 13/19] c --- arrow-ipc/src/writer.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 19054248050c..dcaee964c5c1 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -89,13 +89,6 @@ impl EncodedBuffer { EncodedBuffer::Compressed(v) => v.as_slice(), } } - - fn len(&self) -> usize { - match self { - EncodedBuffer::Raw(b) => b.len(), - EncodedBuffer::Compressed(v) => v.len(), - } - } } /// Accumulates the IPC metadata produced by [`write_array_data`]. /// @@ -1769,8 +1762,7 @@ fn write_encoded_message_direct( let alignment = write_options.alignment; for enc in encoded_buffers { - writer.write_all(enc.as_slice())?; - writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; + write_body_buffer(&mut *writer, enc.as_slice(), alignment)?; } Ok(aligned_size) } From a928020fc1a599fec801f0cc472fad24e3ca43eb Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 09:17:18 -0700 Subject: [PATCH 14/19] c --- arrow-ipc/benches/ipc_writer.rs | 69 ++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index 5c17d6e8d617..e0d03f385094 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -16,10 +16,9 @@ // under the License. use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; -use arrow_array::types::Int32Type; use arrow_array::{DictionaryArray, RecordBatch, builder::StringBuilder}; use arrow_ipc::CompressionType; -use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; +use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use std::sync::Arc; @@ -71,11 +70,8 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - // Each batch carries a distinct dictionary, so every `write` emits a - // dictionary (replacement) message followed by the record batch. This - // exercises the dictionary-message encoding path. group.bench_function("StreamWriter/write_10/dict", |b| { - let batches = create_dict_batches(10, 8192, 2048); + let batches = create_dict_batches(10, 8192); let schema = batches[0].schema(); let mut buffer = Vec::with_capacity(2 * 1024 * 1024); b.iter(move || { @@ -87,28 +83,57 @@ fn criterion_benchmark(c: &mut Criterion) { writer.finish().unwrap(); }) }); + + group.bench_function("StreamWriter/write_10/dict/delta", |b| { + let batches = create_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + StreamWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); } -/// Build `n` record batches, each with a single `Dictionary(Int32, Utf8)` -/// column. Each batch uses a distinct set of dictionary values so that the -/// writer must emit a fresh dictionary message for every batch. -fn create_dict_batches(n: usize, num_rows: usize, dict_len: usize) -> Vec { +fn create_dict_batches(n: usize, num_rows: usize) -> Vec { let schema = Arc::new(Schema::new(vec![Field::new( "d0", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), false, )])); - (0..n) - .map(|batch_idx| { - // Distinct values per batch force a new dictionary message each time. - let values: Vec = (0..dict_len) - .map(|v| format!("batch {batch_idx} dictionary value number {v}")) - .collect(); - let keys = (0..num_rows).map(|i| values[i % dict_len].as_str()); - let dict: DictionaryArray = keys.collect(); - RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap() - }) - .collect() + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + // Half of the keys will be present in every batch and have stable values + // The other half of the keys will be specific to each batch. This gives us + // some delta opportunities and does not require dictionary replacement. + let keys = 0..(num_rows / 2); + let keys2 = (0..num_rows - keys.len()).map(|x| (n * i) + x); + let keys: Vec = keys.chain(keys2).map(|x| x as u32).collect(); + + let values = (0..num_rows / 2).map(|x| format!("Value {x}")); + let values2 = (0..num_rows - values.len()).map(|x| format!("Value {i}{x}")); + let mut builder = StringBuilder::new(); + values.chain(values2).for_each(|s| builder.append_value(s)); + let values = builder.finish(); + + let a = DictionaryArray::new(keys.into(), Arc::new(values)); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(a)]).unwrap()); + } + + batches } fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { From 949d23c3caabcc02bea8985b24b3db8ca75ac895 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 09:30:01 -0700 Subject: [PATCH 15/19] c --- arrow-ipc/benches/ipc_writer.rs | 72 +++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index e0d03f385094..a6ec10474185 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; -use arrow_array::{DictionaryArray, RecordBatch, builder::StringBuilder}; +use arrow_array::RecordBatch; +use arrow_array::builder::{ + Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, +}; +use arrow_array::types::UInt32Type; use arrow_ipc::CompressionType; use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; @@ -71,7 +74,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); group.bench_function("StreamWriter/write_10/dict", |b| { - let batches = create_dict_batches(10, 8192); + let batches = create_unique_dict_batches(10, 8192); let schema = batches[0].schema(); let mut buffer = Vec::with_capacity(2 * 1024 * 1024); b.iter(move || { @@ -85,7 +88,7 @@ fn criterion_benchmark(c: &mut Criterion) { }); group.bench_function("StreamWriter/write_10/dict/delta", |b| { - let batches = create_dict_batches(10, 8192); + let batches = create_delta_dict_batches(10, 8192); let schema = batches[0].schema(); let mut buffer = Vec::with_capacity(2 * 1024 * 1024); let options = @@ -107,30 +110,55 @@ fn criterion_benchmark(c: &mut Criterion) { }); } -fn create_dict_batches(n: usize, num_rows: usize) -> Vec { - let schema = Arc::new(Schema::new(vec![Field::new( +fn dict_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new( "d0", DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), false, - )])); + )])) +} + +/// Build `n` record batches with a single `Dictionary(UInt32, Utf8)` column whose +/// dictionary grows across batches. A single builder is reused and +/// `finish_preserve_values` so each batch's dictionary has the previous batch's +/// as a prefix which allows `DictionaryHandling::Delta` emit delta messages instead +/// of full replacements. +fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = dict_schema(); + let mut builder = StringDictionaryBuilder::::new(); let mut batches = Vec::with_capacity(n); for i in 0..n { - // Half of the keys will be present in every batch and have stable values - // The other half of the keys will be specific to each batch. This gives us - // some delta opportunities and does not require dictionary replacement. - let keys = 0..(num_rows / 2); - let keys2 = (0..num_rows - keys.len()).map(|x| (n * i) + x); - let keys: Vec = keys.chain(keys2).map(|x| x as u32).collect(); - - let values = (0..num_rows / 2).map(|x| format!("Value {x}")); - let values2 = (0..num_rows - values.len()).map(|x| format!("Value {i}{x}")); - let mut builder = StringBuilder::new(); - values.chain(values2).for_each(|s| builder.append_value(s)); - let values = builder.finish(); - - let a = DictionaryArray::new(keys.into(), Arc::new(values)); - batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(a)]).unwrap()); + // 1/4 of the rows reuse values shared by every batch (deduped to existing keys); + // the other half introduce values unique to this batch (extend the dictionary). + for r in 0..num_rows { + if r < num_rows / 4 { + builder.append_value(format!("batch {i} value {}", r - num_rows / 2)); + } else { + builder.append_value(format!("shared {r}")); + } + } + // Preserve the values builder so the dictionary accumulates across batches. + let dict = builder.finish_preserve_values(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); + } + + batches +} + +/// Build `n` record batches each with a completely distinct `Dictionary(UInt32, Utf8)` +/// dictionary for each batch. +fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = dict_schema(); + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + let mut builder = StringDictionaryBuilder::::new(); + for r in 0..num_rows { + builder.append_value(format!("batch {i} value {}", r % (num_rows / 2))); + } + let dict = builder.finish(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); } batches From 67689c93492538c470bacdb99ad7ab12dd336708 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 09:42:20 -0700 Subject: [PATCH 16/19] Update comments --- arrow-ipc/benches/ipc_writer.rs | 40 +++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index a6ec10474185..21debb2e1256 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -108,6 +108,30 @@ fn criterion_benchmark(c: &mut Criterion) { writer.finish().unwrap(); }) }); + + // The file writer rejects dictionary replacement, so only the delta case is + // exercised here (growing dictionaries that are prefixes of one another). + group.bench_function("FileWriter/write_10/dict/delta", |b| { + let batches = create_delta_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); } fn dict_schema() -> Arc { @@ -119,21 +143,20 @@ fn dict_schema() -> Arc { } /// Build `n` record batches with a single `Dictionary(UInt32, Utf8)` column whose -/// dictionary grows across batches. A single builder is reused and -/// `finish_preserve_values` so each batch's dictionary has the previous batch's -/// as a prefix which allows `DictionaryHandling::Delta` emit delta messages instead -/// of full replacements. +/// dictionary grows across batches. A single builder is reused with `finish_preserve_values` +/// so each batch's dictionary has the previous batch's as a prefix which allows +/// us to emit deltas. fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { let schema = dict_schema(); let mut builder = StringDictionaryBuilder::::new(); let mut batches = Vec::with_capacity(n); for i in 0..n { - // 1/4 of the rows reuse values shared by every batch (deduped to existing keys); - // the other half introduce values unique to this batch (extend the dictionary). + // 3/4 of the rows reuse values shared by every batch (deduped to existing keys); + // the other 1/4 introduce values unique to this batch (extend the dictionary). for r in 0..num_rows { if r < num_rows / 4 { - builder.append_value(format!("batch {i} value {}", r - num_rows / 2)); + builder.append_value(format!("batch {i} value {}", r)); } else { builder.append_value(format!("shared {r}")); } @@ -146,8 +169,7 @@ fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { batches } -/// Build `n` record batches each with a completely distinct `Dictionary(UInt32, Utf8)` -/// dictionary for each batch. +/// Build `n` record batches each with a completely distinct dictionary for each batch. fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec { let schema = dict_schema(); From 83ca936f03f88898fbde546118aa9b3adb552be7 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 09:56:45 -0700 Subject: [PATCH 17/19] update benches --- arrow-ipc/benches/ipc_writer.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index 21debb2e1256..5050ff6cd112 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -134,26 +134,22 @@ fn criterion_benchmark(c: &mut Criterion) { }); } -fn dict_schema() -> Arc { - Arc::new(Schema::new(vec![Field::new( - "d0", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), - false, - )])) -} - -/// Build `n` record batches with a single `Dictionary(UInt32, Utf8)` column whose -/// dictionary grows across batches. A single builder is reused with `finish_preserve_values` +/// Build `n` record batches with a single dictionary column whose dictionary +/// grows across batches. A single builder is reused with `finish_preserve_values` /// so each batch's dictionary has the previous batch's as a prefix which allows /// us to emit deltas. fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { - let schema = dict_schema(); + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); let mut builder = StringDictionaryBuilder::::new(); let mut batches = Vec::with_capacity(n); for i in 0..n { - // 3/4 of the rows reuse values shared by every batch (deduped to existing keys); - // the other 1/4 introduce values unique to this batch (extend the dictionary). + // 3/4 of the rows reuse values shared by every batch, the other 1/4 + // introduce values unique to this batch which extends the dictionary. for r in 0..num_rows { if r < num_rows / 4 { builder.append_value(format!("batch {i} value {}", r)); @@ -161,6 +157,7 @@ fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { builder.append_value(format!("shared {r}")); } } + // Preserve the values builder so the dictionary accumulates across batches. let dict = builder.finish_preserve_values(); batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); @@ -171,7 +168,11 @@ fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { /// Build `n` record batches each with a completely distinct dictionary for each batch. fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec { - let schema = dict_schema(); + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); let mut batches = Vec::with_capacity(n); for i in 0..n { From 7ae4f34edfce32168b6ca5f2c68545c27abe8533 Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 15:53:33 -0700 Subject: [PATCH 18/19] fix up docs --- arrow-ipc/src/writer.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index dcaee964c5c1..bceb9371b92e 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -135,7 +135,7 @@ struct IpcWriteMetadata { dictionary_block_sizes: Vec<(usize, usize)>, /// Flatbuffer header size including continuation prefix and alignment padding. padded_header_len: usize, - /// Total length of the record-batch body including trailing alignment padding. + /// Total length of the record-batch body including alignment padding body_len: usize, } @@ -628,6 +628,7 @@ impl IpcDataGenerator { for dict in &dictionaries { encoded_buffers.clear(); + let body_len = self.dictionary_batch_to_sink( dict, write_options, @@ -635,7 +636,8 @@ impl IpcDataGenerator { &mut IpcBodySink::Collect(&mut encoded_buffers), fbb, )?; - let header_len = write_encoded_message_direct( + + let header_len = write_buffered_message( &mut *writer, fbb.finished_data(), &encoded_buffers, @@ -652,7 +654,7 @@ impl IpcDataGenerator { &mut IpcBodySink::Collect(&mut encoded_buffers), fbb, )?; - let padded_header_len = write_encoded_message_direct( + let padded_header_len = write_buffered_message( &mut *writer, fbb.finished_data(), &encoded_buffers, @@ -899,7 +901,7 @@ fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { } DataType::Dictionary(_, _) => { // Do nothing - // Dictionary types are handled in `encode_dictionaries`. + // Dictionary values are handled in special dictionary messages } _ => { for child in array.child_data() { @@ -1391,7 +1393,7 @@ impl FileWriter { } // write EOS - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation_and_meta_size(&mut self.writer, &self.write_options, 0)?; let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -1652,7 +1654,7 @@ impl StreamWriter { )); } - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation_and_meta_size(&mut self.writer, &self.write_options, 0)?; self.writer.flush()?; self.finished = true; @@ -1743,6 +1745,7 @@ pub struct EncodedData { /// Arrow buffers to be written, should be an empty vec for schema messages pub arrow_data: Vec, } + /// Write a single message directly to `writer`: the message header (continuation /// prefix + flatbuffer metadata + alignment padding), then each body buffer with its /// per-buffer alignment padding. @@ -1752,7 +1755,7 @@ pub struct EncodedData { /// compressed). Each buffer is padded to the alignment, so the body needs no trailing /// padding. Returns the padded header length (continuation + metadata + padding), /// needed for IPC footer blocks. -fn write_encoded_message_direct( +fn write_buffered_message( writer: &mut W, ipc_message: &[u8], encoded_buffers: &[EncodedBuffer], @@ -1812,7 +1815,7 @@ fn write_message_header( }; let aligned_size = (flatbuf_size + prefix_size + a) & !a; - write_continuation( + write_continuation_and_meta_size( &mut *writer, write_options, (aligned_size - prefix_size) as i32, @@ -1822,6 +1825,7 @@ fn write_message_header( if flatbuf_size > 0 { writer.write_all(ipc_message)?; } + // write padding writer.write_all(&PADDING[..aligned_size - flatbuf_size - prefix_size])?; @@ -1848,7 +1852,7 @@ fn write_body_buffer( /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream -fn write_continuation( +fn write_continuation_and_meta_size( mut writer: W, write_options: &IpcWriteOptions, total_len: i32, From 78e917bc0b4bf694329557a2001265851a5d699a Mon Sep 17 00:00:00 2001 From: Jake Dern Date: Thu, 11 Jun 2026 16:31:50 -0700 Subject: [PATCH 19/19] Fix rebase issues --- arrow-ipc/src/writer.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index bceb9371b92e..81c0093cdebf 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -842,17 +842,15 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; + let mut meta = IpcMetadataBuilder::default(); let mut variadic_buffer_counts: Vec = vec![]; let mut offset = 0i64; for array_data in columns { offset = write_array_data( &array_data, - &mut buffers, + &mut meta, sink, - &mut nodes, offset, compression_codec, compression_context, @@ -865,8 +863,8 @@ impl IpcDataGenerator { // already a multiple of the alignment -- the body needs no trailing padding. let body_len = offset as usize; - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); + let buffers = fbb.create_vector(&meta.buffers); + let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { @@ -2048,10 +2046,8 @@ fn write_array_data( let num_rows = array_data.len(); let null_count = array_data.null_count(); if !matches!(array_data.data_type(), DataType::Null) { - meta.nodes.push(crate::FieldNode::new( - num_rows as i64, - array_data.null_count() as i64, - )); + meta.nodes + .push(crate::FieldNode::new(num_rows as i64, null_count as i64)); } else { // NullArray's null_count equals to len, but ArrayData null_count is always 0. meta.nodes