diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index eda7e3c58fe0..5050ff6cd112 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; -use arrow_array::{RecordBatch, builder::StringBuilder}; +use arrow_array::RecordBatch; +use arrow_array::builder::{ + Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, +}; +use arrow_array::types::UInt32Type; use arrow_ipc::CompressionType; -use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; +use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use std::sync::Arc; @@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) { writer.finish().unwrap(); }) }); + + group.bench_function("StreamWriter/write_10/dict", |b| { + let batches = create_unique_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + b.iter(move || { + buffer.clear(); + let mut writer = StreamWriter::try_new(&mut buffer, schema.as_ref()).unwrap(); + for batch in &batches { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + }) + }); + + group.bench_function("StreamWriter/write_10/dict/delta", |b| { + let batches = create_delta_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + StreamWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); + + // The file writer rejects dictionary replacement, so only the delta case is + // exercised here (growing dictionaries that are prefixes of one another). + group.bench_function("FileWriter/write_10/dict/delta", |b| { + let batches = create_delta_dict_batches(10, 8192); + let schema = batches[0].schema(); + let mut buffer = Vec::with_capacity(2 * 1024 * 1024); + let options = + IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta); + + b.iter(move || { + buffer.clear(); + + let mut writer = + FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), options.clone()) + .unwrap(); + + for batch in &batches { + writer.write(batch).unwrap(); + } + + writer.finish().unwrap(); + }) + }); +} + +/// Build `n` record batches with a single dictionary column whose dictionary +/// grows across batches. A single builder is reused with `finish_preserve_values` +/// so each batch's dictionary has the previous batch's as a prefix which allows +/// us to emit deltas. +fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); + let mut builder = StringDictionaryBuilder::::new(); + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + // 3/4 of the rows reuse values shared by every batch, the other 1/4 + // introduce values unique to this batch which extends the dictionary. + for r in 0..num_rows { + if r < num_rows / 4 { + builder.append_value(format!("batch {i} value {}", r)); + } else { + builder.append_value(format!("shared {r}")); + } + } + + // Preserve the values builder so the dictionary accumulates across batches. + let dict = builder.finish_preserve_values(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); + } + + batches +} + +/// Build `n` record batches each with a completely distinct dictionary for each batch. +fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec { + let schema = Arc::new(Schema::new(vec![Field::new( + "d0", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + false, + )])); + + let mut batches = Vec::with_capacity(n); + for i in 0..n { + let mut builder = StringDictionaryBuilder::::new(); + for r in 0..num_rows { + builder.append_value(format!("batch {i} value {}", r % (num_rows / 2))); + } + let dict = builder.finish(); + batches.push(RecordBatch::try_new(schema.clone(), vec![Arc::new(dict)]).unwrap()); + } + + batches } fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4142858ce80c..81c0093cdebf 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -89,13 +89,6 @@ impl EncodedBuffer { EncodedBuffer::Compressed(v) => v.as_slice(), } } - - fn len(&self) -> usize { - match self { - EncodedBuffer::Raw(b) => b.len(), - EncodedBuffer::Compressed(v) => v.len(), - } - } } /// Accumulates the IPC metadata produced by [`write_array_data`]. /// @@ -142,7 +135,7 @@ struct IpcWriteMetadata { dictionary_block_sizes: Vec<(usize, usize)>, /// Flatbuffer header size including continuation prefix and alignment padding. padded_header_len: usize, - /// Total length of the record-batch body including trailing alignment padding. + /// Total length of the record-batch body including alignment padding body_len: usize, } @@ -270,6 +263,32 @@ impl Default for IpcWriteOptions { /// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc pub struct IpcDataGenerator {} +/// A dictionary determined (during the dictionary walk) to need sending, captured +/// so it can be serialised to the output afterwards. Holding the `ArrayData` +/// (a cheap clone of `Arc`-backed buffers) lets the walk stay separate from +/// serialisation, so the latter can reuse a single [`FlatBufferBuilder`] and +/// stream bodies zero-copy. +struct DictionaryToEncode { + id: i64, + data: ArrayData, + is_delta: bool, +} + +/// The result of encoding a record batch's body and flatbuffer `RecordBatch` table +/// via [`IpcDataGenerator::encode_record_batch_data`]. +/// +/// The `RecordBatch` table is left in-progress in the caller's [`FlatBufferBuilder`]; +/// the caller wraps `record_batch` in a `Message` (directly for a record batch +/// message, or inside a `DictionaryBatch` for a dictionary message). +struct EncodedRecordBatch { + /// In-progress flatbuffer offset of the `RecordBatch` table. + record_batch: flatbuffers::WIPOffset>, + /// Total body length written to the sink (the sum of each buffer's encoded + /// length and its alignment padding). Each buffer is individually padded to the + /// alignment, so the body is already aligned and needs no trailing padding. + body_len: usize, +} + impl IpcDataGenerator { /// Converts a schema to an IPC message along with `dictionary_tracker` /// and returns it encoded inside [EncodedData] as a flatbuffer. @@ -303,27 +322,25 @@ impl IpcDataGenerator { } } - fn _encode_dictionaries>( + fn _collect_dict_updates>( &self, column: &ArrayRef, - encoded_dictionaries: &mut Vec, + encoded_dictionaries: &mut Vec, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, - compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { let s = as_struct_array(column); for (field, column) in fields.iter().zip(s.columns()) { - self.encode_dictionaries( + self.collect_dict_updates( field, column, encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } } @@ -338,62 +355,57 @@ impl IpcDataGenerator { // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries // only for values array. let values_array = make_array(data.child_data()[1].clone()); - self.encode_dictionaries( + self.collect_dict_updates( values, &values_array, encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::List(field) => { let list = as_list_array(column); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::LargeList(field) => { let list = as_large_list_array(column); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::ListView(field) => { let list = column.as_list_view::(); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::LargeListView(field) => { let list = column.as_list_view::(); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::FixedSizeList(field, _) => { @@ -401,14 +413,13 @@ impl IpcDataGenerator { .as_any() .downcast_ref::() .expect("Unable to downcast to fixed size list array"); - self.encode_dictionaries( + self.collect_dict_updates( field, list.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::Map(field, _) => { @@ -420,39 +431,36 @@ impl IpcDataGenerator { }; // keys - self.encode_dictionaries( + self.collect_dict_updates( keys, map_array.keys(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; // values - self.encode_dictionaries( + self.collect_dict_updates( values, map_array.values(), encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } DataType::Union(fields, _) => { let union = as_union_array(column); for (type_id, field) in fields.iter() { let column = union.child(type_id); - self.encode_dictionaries( + self.collect_dict_updates( field, column, encoded_dictionaries, dictionary_tracker, write_options, dict_id, - compression_context, )?; } } @@ -462,16 +470,18 @@ impl IpcDataGenerator { Ok(()) } + // Collect all dicts that need a dictionary message i.e. ones that were + // either not in the tracker previously or ones that were but need a + // replacement or delta. #[allow(clippy::too_many_arguments)] - fn encode_dictionaries>( + fn collect_dict_updates>( &self, field: &Field, column: &ArrayRef, - encoded_dictionaries: &mut Vec, + dictionaries: &mut Vec, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, - compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -480,13 +490,12 @@ impl IpcDataGenerator { let values = make_array(dict_data.child_data()[0].clone()); - self._encode_dictionaries( + self._collect_dict_updates( &values, - encoded_dictionaries, + dictionaries, dictionary_tracker, write_options, dict_id_seq, - compression_context, )?; // It's important to only take the dict_id at this point, because the dict ID @@ -508,32 +517,27 @@ impl IpcDataGenerator { )? { DictionaryUpdate::None => {} DictionaryUpdate::New | DictionaryUpdate::Replaced => { - encoded_dictionaries.push(self.dictionary_batch_to_bytes( - dict_id, - dict_values, - write_options, - false, - compression_context, - )?); + dictionaries.push(DictionaryToEncode { + id: dict_id, + data: dict_values.clone(), + is_delta: false, + }); } DictionaryUpdate::Delta(data) => { - encoded_dictionaries.push(self.dictionary_batch_to_bytes( - dict_id, - &data, - write_options, - true, - compression_context, - )?); + dictionaries.push(DictionaryToEncode { + id: dict_id, + data, + is_delta: true, + }); } } } - _ => self._encode_dictionaries( + _ => self._collect_dict_updates( column, - encoded_dictionaries, + dictionaries, dictionary_tracker, write_options, dict_id_seq, - compression_context, )?, } @@ -550,52 +554,54 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { - let encoded_dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; + let dictionaries = self.collect_all_dicts(batch, dictionary_tracker, write_options)?; + let mut encoded_dictionaries = Vec::with_capacity(dictionaries.len()); + for dict in dictionaries { + encoded_dictionaries.push(self.dictionary_batch_to_bytes( + dict, + write_options, + compression_context, + )?); + } + let mut fbb = FlatBufferBuilder::new(); let mut arrow_data = Vec::new(); - let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( + self.record_batch_to_bytes( batch, write_options, compression_context, &mut IpcBodySink::Write(&mut arrow_data), + &mut fbb, )?; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); Ok(( encoded_dictionaries, EncodedData { - ipc_message, + ipc_message: fbb.finished_data().to_vec(), arrow_data, }, )) } - /// Encode dictionary batches for all columns in `batch`. - fn encode_all_dicts( + /// Walk the record batch and collect dictionaries for later encoding. + fn collect_all_dicts( &self, batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, - ) -> Result, ArrowError> { + ) -> Result, ArrowError> { let schema = batch.schema(); - let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); + let mut dictionaries = Vec::with_capacity(schema.flattened_fields().len()); let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter(); for (i, field) in schema.fields().iter().enumerate() { - self.encode_dictionaries( + self.collect_dict_updates( field, batch.column(i), - &mut encoded_dictionaries, + &mut dictionaries, dictionary_tracker, write_options, &mut dict_id, - compression_context, )?; } - Ok(encoded_dictionaries) + Ok(dictionaries) } /// Write dictionary batches and the record batch directly to `writer`, skipping the @@ -608,18 +614,10 @@ impl IpcDataGenerator { write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, writer: &mut W, + fbb: &mut FlatBufferBuilder<'static>, ) -> Result { - let encoded_dictionaries = self.encode_all_dicts( - batch, - dictionary_tracker, - write_options, - compression_context, - )?; - - let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len()); - for dict in encoded_dictionaries { - dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?); - } + let dictionaries = self.collect_all_dicts(batch, dictionary_tracker, write_options)?; + let mut dictionary_block_sizes = Vec::with_capacity(dictionaries.len()); let capacity = batch .columns() @@ -627,37 +625,45 @@ impl IpcDataGenerator { .map(|a| estimate_encoded_buffer_count(a.data_type())) .sum(); let mut encoded_buffers: Vec = Vec::with_capacity(capacity); - let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes( + + for dict in &dictionaries { + encoded_buffers.clear(); + + let body_len = self.dictionary_batch_to_sink( + dict, + write_options, + compression_context, + &mut IpcBodySink::Collect(&mut encoded_buffers), + fbb, + )?; + + let header_len = write_buffered_message( + &mut *writer, + fbb.finished_data(), + &encoded_buffers, + write_options, + )?; + dictionary_block_sizes.push((header_len, body_len)); + } + + encoded_buffers.clear(); + let body_len = self.record_batch_to_bytes( batch, write_options, compression_context, &mut IpcBodySink::Collect(&mut encoded_buffers), + fbb, )?; - - let alignment = write_options.alignment; - let a = usize::from(alignment - 1); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; - let aligned_size = (ipc_message.len() + prefix_size + a) & !a; - write_continuation( + let padded_header_len = write_buffered_message( &mut *writer, + fbb.finished_data(), + &encoded_buffers, write_options, - (aligned_size - prefix_size) as i32, )?; - writer.write_all(&ipc_message)?; - writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?; - for enc in &encoded_buffers { - writer.write_all(enc.as_slice())?; - writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?; - } - writer.write_all(&PADDING[..tail_pad])?; Ok(IpcWriteMetadata { dictionary_block_sizes, - padded_header_len: aligned_size, + padded_header_len, body_len, }) } @@ -683,134 +689,180 @@ impl IpcDataGenerator { /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the /// serialised buffer data. /// - /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the - /// total body length including trailing padding, and the trailing alignment padding byte count. + /// Returns the total body length written to `sink` (including per-buffer alignment + /// padding). + /// + /// The message header is located in the provided FlatBufferBuilder's finished + /// bytes. A successful Result from this function guarantees the fbb is in + /// a finished state to call [`FlatBufferBuilder::finished_data`]. fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, compression_context: &mut CompressionContext, sink: &mut IpcBodySink<'_>, - ) -> Result<(Vec, usize, usize), ArrowError> { - let mut fbb = FlatBufferBuilder::new(); - - let batch_compression_type = write_options.batch_compression_type; - - let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); - c.add_method(crate::BodyCompressionMethod::BUFFER); - c.add_codec(batch_compression_type); - c.finish() - }); - - let compression_codec: Option = - batch_compression_type.map(TryInto::try_into).transpose()?; - - let alignment = write_options.alignment; - let mut variadic_buffer_counts = vec![]; - let mut meta = IpcMetadataBuilder::default(); - let mut offset = 0i64; - - for array in batch.columns() { - let array_data = array.to_data(); - offset = write_array_data( - &array_data, - &mut meta, - sink, - offset, - compression_codec, - compression_context, - write_options, - )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); - } - - let tail_pad = pad_to_alignment(alignment, offset as usize); - let body_len = offset as usize + tail_pad; - - let buffers = fbb.create_vector(&meta.buffers); - let nodes = fbb.create_vector(&meta.nodes); - let variadic_buffer = if variadic_buffer_counts.is_empty() { - None - } else { - Some(fbb.create_vector(&variadic_buffer_counts)) - }; + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result { + // Reuse the builder's internal buffer across messages; `reset` keeps the + // allocated capacity and only clears the in-progress state. + fbb.reset(); + + let EncodedRecordBatch { + record_batch, + body_len, + } = self.encode_record_batch_data( + batch.columns().iter().map(|array| array.to_data()), + batch.num_rows() as i64, + write_options, + compression_context, + sink, + fbb, + )?; - let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(c) = compression { - batch_builder.add_compression(c); - } - if let Some(v) = variadic_buffer { - batch_builder.add_variadicBufferCounts(v); - } - batch_builder.finish().as_union_value() - }; // create an crate::Message - let mut message = crate::MessageBuilder::new(&mut fbb); + let mut message = crate::MessageBuilder::new(fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); - message.add_header(root); + message.add_header(record_batch.as_union_value()); let root = message.finish(); fbb.finish(root, None); - Ok((fbb.finished_data().to_vec(), body_len, tail_pad)) + Ok(body_len) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the /// other for the data fn dictionary_batch_to_bytes( &self, - dict_id: i64, - array_data: &ArrayData, + dict: DictionaryToEncode, write_options: &IpcWriteOptions, - is_delta: bool, compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); - let mut arrow_data: Vec = vec![]; + self.dictionary_batch_to_sink( + &dict, + write_options, + compression_context, + &mut IpcBodySink::Write(&mut arrow_data), + &mut fbb, + )?; - // get the type of compression + Ok(EncodedData { + ipc_message: fbb.finished_data().to_vec(), + arrow_data, + }) + } + + /// Encodes a dictionary batch's flatbuffer metadata into `fbb` (read back via + /// `fbb.finished_data()`) and fills `sink` with its body buffers. + /// + /// Returns the total body length written to `sink` (including per-buffer alignment + /// padding). + fn dictionary_batch_to_sink( + &self, + dict: &DictionaryToEncode, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + sink: &mut IpcBodySink<'_>, + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result { + // Reuse the builder's internal buffer across messages; `reset` keeps the + // allocated capacity and only clears the in-progress state. + fbb.reset(); + + // A dictionary batch is a record batch (the single column of dictionary + // values) wrapped in a DictionaryBatch, so we share the record batch body + // and table encoding and only differ in the message framing. + let EncodedRecordBatch { + record_batch, + body_len, + } = self.encode_record_batch_data( + std::iter::once(dict.data.clone()), + dict.data.len() as i64, + write_options, + compression_context, + sink, + fbb, + )?; + + let root = { + let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb); + batch_builder.add_id(dict.id); + batch_builder.add_data(record_batch); + batch_builder.add_isDelta(dict.is_delta); + batch_builder.finish().as_union_value() + }; + + let root = { + let mut message_builder = crate::MessageBuilder::new(fbb); + message_builder.add_version(write_options.metadata_version); + message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); + message_builder.add_bodyLength(body_len as i64); + message_builder.add_header(root); + message_builder.finish() + }; + + fbb.finish(root, None); + + Ok(body_len) + } + + /// Encode the body buffers of a single record batch into `sink` and build the + /// flatbuffer `RecordBatch` table into `fbb`. + /// + /// This is the shared core of [`Self::record_batch_to_bytes`] and + /// [`Self::dictionary_batch_to_sink`]: a dictionary message embeds a record + /// batch (its single column of values), so both encode the same `RecordBatch` + /// table and only differ in how they wrap the returned offset in a message. + /// + /// `columns` yields the column data in IPC buffer order and `row_count` is the + /// logical length recorded in the `RecordBatch` header. The caller is responsible + /// for wrapping the returned [`EncodedRecordBatch::record_batch`] in a `Message` + /// and calling [`FlatBufferBuilder::finish`]. + fn encode_record_batch_data( + &self, + columns: impl IntoIterator, + row_count: i64, + write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, + sink: &mut IpcBodySink<'_>, + fbb: &mut FlatBufferBuilder<'static>, + ) -> Result { let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); + let mut c = crate::BodyCompressionBuilder::new(fbb); c.add_method(crate::BodyCompressionMethod::BUFFER); c.add_codec(batch_compression_type); c.finish() }); - let compression_codec: Option = batch_compression_type - .map(|batch_compression_type| batch_compression_type.try_into()) - .transpose()?; + let compression_codec: Option = + batch_compression_type.map(TryInto::try_into).transpose()?; - let alignment = write_options.alignment; let mut meta = IpcMetadataBuilder::default(); - let mut sink = IpcBodySink::Write(&mut arrow_data); - let offset = write_array_data( - array_data, - &mut meta, - &mut sink, - 0, - compression_codec, - compression_context, - write_options, - )?; + let mut variadic_buffer_counts: Vec = vec![]; + let mut offset = 0i64; - let mut variadic_buffer_counts = vec![]; - append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); + for array_data in columns { + offset = write_array_data( + &array_data, + &mut meta, + sink, + offset, + compression_codec, + compression_context, + write_options, + )?; + append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); + } - // pad the tail of body data - let tail_pad = pad_to_alignment(alignment, offset as usize); - let body_len = offset as usize + tail_pad; - arrow_data.extend_from_slice(&PADDING[..tail_pad]); + // Each buffer is padded to the alignment as it is written, so `offset` is + // already a multiple of the alignment -- the body needs no trailing padding. + let body_len = offset as usize; - // write data let buffers = fbb.create_vector(&meta.buffers); let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { @@ -819,43 +871,21 @@ impl IpcDataGenerator { Some(fbb.create_vector(&variadic_buffer_counts)) }; - let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(array_data.len() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - if let Some(c) = compression { - batch_builder.add_compression(c); - } - if let Some(v) = variadic_buffer { - batch_builder.add_variadicBufferCounts(v); - } - batch_builder.finish() - }; - - let root = { - let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb); - batch_builder.add_id(dict_id); - batch_builder.add_data(root); - batch_builder.add_isDelta(is_delta); - batch_builder.finish().as_union_value() - }; - - let root = { - let mut message_builder = crate::MessageBuilder::new(&mut fbb); - message_builder.add_version(write_options.metadata_version); - message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(body_len as i64); - message_builder.add_header(root); - message_builder.finish() - }; - - fbb.finish(root, None); - let finished_data = fbb.finished_data(); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); + batch_builder.add_length(row_count); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } + let record_batch = batch_builder.finish(); - Ok(EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, + Ok(EncodedRecordBatch { + record_batch, + body_len, }) } } @@ -869,7 +899,7 @@ fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { } DataType::Dictionary(_, _) => { // Do nothing - // Dictionary types are handled in `encode_dictionaries`. + // Dictionary values are handled in special dictionary messages } _ => { for child in array.child_data() { @@ -1239,6 +1269,10 @@ pub struct FileWriter { data_gen: IpcDataGenerator, compression_context: CompressionContext, + + /// Reusable flatbuffer builder, shared across all messages to avoid + /// reallocating its internal buffer on every batch. + fbb: FlatBufferBuilder<'static>, } impl FileWriter> { @@ -1301,6 +1335,7 @@ impl FileWriter { custom_metadata: HashMap::new(), data_gen, compression_context: CompressionContext::default(), + fbb: FlatBufferBuilder::new(), }) } @@ -1323,6 +1358,7 @@ impl FileWriter { &self.write_options, &mut self.compression_context, &mut self.writer, + &mut self.fbb, )?; for (header_len, body_len) in meta.dictionary_block_sizes { @@ -1355,7 +1391,7 @@ impl FileWriter { } // write EOS - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation_and_meta_size(&mut self.writer, &self.write_options, 0)?; let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -1530,6 +1566,10 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, compression_context: CompressionContext, + + /// Reusable flatbuffer builder, shared across all messages to avoid + /// reallocating its internal buffer on every batch. + fbb: FlatBufferBuilder<'static>, } impl StreamWriter> { @@ -1581,6 +1621,7 @@ impl StreamWriter { dictionary_tracker, data_gen, compression_context: CompressionContext::default(), + fbb: FlatBufferBuilder::new(), }) } @@ -1598,6 +1639,7 @@ impl StreamWriter { &self.write_options, &mut self.compression_context, &mut self.writer, + &mut self.fbb, )?; Ok(()) } @@ -1610,7 +1652,7 @@ impl StreamWriter { )); } - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation_and_meta_size(&mut self.writer, &self.write_options, 0)?; self.writer.flush()?; self.finished = true; @@ -1701,6 +1743,31 @@ pub struct EncodedData { /// Arrow buffers to be written, should be an empty vec for schema messages pub arrow_data: Vec, } + +/// Write a single message directly to `writer`: the message header (continuation +/// prefix + flatbuffer metadata + alignment padding), then each body buffer with its +/// per-buffer alignment padding. +/// +/// `ipc_message` is the raw flatbuffer header (without continuation prefix), and +/// `encoded_buffers` are the already-encoded body buffers (zero-copy or +/// compressed). Each buffer is padded to the alignment, so the body needs no trailing +/// padding. Returns the padded header length (continuation + metadata + padding), +/// needed for IPC footer blocks. +fn write_buffered_message( + writer: &mut W, + ipc_message: &[u8], + encoded_buffers: &[EncodedBuffer], + write_options: &IpcWriteOptions, +) -> Result { + let aligned_size = write_message_header(writer, ipc_message, write_options)?; + + let alignment = write_options.alignment; + for enc in encoded_buffers { + write_body_buffer(&mut *writer, enc.as_slice(), alignment)?; + } + Ok(aligned_size) +} + /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message( mut writer: W, @@ -1714,41 +1781,56 @@ pub fn write_message( )); } + let aligned_size = write_message_header(&mut writer, &encoded.ipc_message, write_options)?; + + // write arrow data + let body_len = if arrow_data_len > 0 { + write_body_buffer(&mut writer, &encoded.arrow_data, write_options.alignment)? + } else { + 0 + }; + + Ok((aligned_size, body_len)) +} + +/// Write the encapsulated-message header to `writer`: the continuation marker and +/// metadata length, the flatbuffer metadata (`ipc_message`), and the alignment +/// padding that follows it. +/// +/// Returns the padded header length (continuation prefix + metadata + padding), +/// which is also the value encoded in the continuation length field. +fn write_message_header( + writer: &mut W, + ipc_message: &[u8], + write_options: &IpcWriteOptions, +) -> Result { let a = usize::from(write_options.alignment - 1); - let buffer = encoded.ipc_message; - let flatbuf_size = buffer.len(); + let flatbuf_size = ipc_message.len(); let prefix_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; - write_continuation( - &mut writer, + write_continuation_and_meta_size( + &mut *writer, write_options, (aligned_size - prefix_size) as i32, )?; // write the flatbuf if flatbuf_size > 0 { - writer.write_all(&buffer)?; + writer.write_all(ipc_message)?; } - // write padding - writer.write_all(&PADDING[..padding_bytes])?; - // write arrow data - let body_len = if arrow_data_len > 0 { - write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? - } else { - 0 - }; + // write padding + writer.write_all(&PADDING[..aligned_size - flatbuf_size - prefix_size])?; - Ok((aligned_size, body_len)) + Ok(aligned_size) } -fn write_body_buffers( +fn write_body_buffer( mut writer: W, data: &[u8], alignment: u8, @@ -1768,7 +1850,7 @@ fn write_body_buffers( /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream -fn write_continuation( +fn write_continuation_and_meta_size( mut writer: W, write_options: &IpcWriteOptions, total_len: i32, @@ -1962,11 +2044,10 @@ fn write_array_data( ) -> Result { let mut offset = offset; let num_rows = array_data.len(); + let null_count = array_data.null_count(); if !matches!(array_data.data_type(), DataType::Null) { - meta.nodes.push(crate::FieldNode::new( - num_rows as i64, - array_data.null_count() as i64, - )); + meta.nodes + .push(crate::FieldNode::new(num_rows as i64, null_count as i64)); } else { // NullArray's null_count equals to len, but ArrayData null_count is always 0. meta.nodes