diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index d05a5dbc377a..4142858ce80c 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -97,7 +97,20 @@ impl EncodedBuffer { } } } -/// Destination for per-buffer encoded output produced by [`write_array_data`]. +/// Accumulates the IPC metadata produced by [`write_array_data`]. +/// +/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`) +/// header. The companion [`IpcBodySink`] holds the actual encoded bytes. +#[derive(Default)] +struct IpcMetadataBuilder { + nodes: Vec, + buffers: Vec, +} + +/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`]. +/// +/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata +/// (offset + length of each buffer in the body); together they form a complete IPC message. enum IpcBodySink<'a> { /// Serialize buffer bytes (with padding) into a contiguous byte vec. Write(&'a mut Vec), @@ -681,9 +694,6 @@ impl IpcDataGenerator { ) -> Result<(Vec, usize, usize), ArrowError> { let mut fbb = FlatBufferBuilder::new(); - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; - let batch_compression_type = write_options.batch_compression_type; let compression = batch_compression_type.map(|batch_compression_type| { @@ -698,18 +708,16 @@ impl IpcDataGenerator { let alignment = write_options.alignment; let mut variadic_buffer_counts = vec![]; + let mut meta = IpcMetadataBuilder::default(); let mut offset = 0i64; for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( &array_data, - &mut buffers, + &mut meta, sink, - &mut nodes, offset, - array.len(), - array.null_count(), compression_codec, compression_context, write_options, @@ -720,8 +728,8 @@ impl IpcDataGenerator { let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); + let buffers = fbb.create_vector(&meta.buffers); + let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { @@ -765,8 +773,6 @@ impl IpcDataGenerator { ) -> Result { let mut fbb = FlatBufferBuilder::new(); - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; // get the type of compression @@ -784,15 +790,13 @@ impl IpcDataGenerator { .transpose()?; let alignment = write_options.alignment; + let mut meta = IpcMetadataBuilder::default(); let mut sink = IpcBodySink::Write(&mut arrow_data); let offset = write_array_data( array_data, - &mut buffers, + &mut meta, &mut sink, - &mut nodes, 0, - array_data.len(), - array_data.null_count(), compression_codec, compression_context, write_options, @@ -807,8 +811,8 @@ impl IpcDataGenerator { arrow_data.extend_from_slice(&PADDING[..tail_pad]); // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); + let buffers = fbb.create_vector(&meta.buffers); + let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { None } else { @@ -1945,29 +1949,28 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer { /// Recursively encodes `array_data` into its IPC representation. /// /// Output goes to two separate channels: -/// - `buffers` / `nodes`: IPC metadata (offsets, lengths, null counts) that will be -/// serialised into the flatbuffer `RecordBatch` header. +/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header. /// - `sink`: the raw Arrow data bytes that form the IPC message body. -#[allow(clippy::too_many_arguments)] fn write_array_data( array_data: &ArrayData, - buffers: &mut Vec, + meta: &mut IpcMetadataBuilder, sink: &mut IpcBodySink<'_>, - nodes: &mut Vec, offset: i64, - num_rows: usize, - null_count: usize, compression_codec: Option, compression_context: &mut CompressionContext, write_options: &IpcWriteOptions, ) -> Result { let mut offset = offset; + let num_rows = array_data.len(); if !matches!(array_data.data_type(), DataType::Null) { - nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64)); + meta.nodes.push(crate::FieldNode::new( + num_rows as i64, + array_data.null_count() as i64, + )); } else { - // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData - // where null_count is always 0. - nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64)); + // NullArray's null_count equals to len, but ArrayData null_count is always 0. + meta.nodes + .push(crate::FieldNode::new(num_rows as i64, num_rows as i64)); } if has_validity_bitmap(array_data.data_type(), write_options) { // write null buffer if exists @@ -1984,7 +1987,7 @@ fn write_array_data( offset = encode_sink_buffer( null_buffer, - buffers, + meta, sink, offset, compression_codec, @@ -1999,7 +2002,7 @@ fn write_array_data( for buffer in [offsets, values] { offset = encode_sink_buffer( buffer, - buffers, + meta, sink, offset, compression_codec, @@ -2017,7 +2020,7 @@ fn write_array_data( let views = get_or_truncate_buffer(array_data); offset = encode_sink_buffer( views, - buffers, + meta, sink, offset, compression_codec, @@ -2028,7 +2031,7 @@ fn write_array_data( for buffer in array_data.buffers().iter().skip(1) { offset = encode_sink_buffer( buffer.clone(), - buffers, + meta, sink, offset, compression_codec, @@ -2041,7 +2044,7 @@ fn write_array_data( for buffer in [offsets, values] { offset = encode_sink_buffer( buffer, - buffers, + meta, sink, offset, compression_codec, @@ -2062,7 +2065,7 @@ fn write_array_data( let buffer = get_or_truncate_buffer(array_data); offset = encode_sink_buffer( buffer, - buffers, + meta, sink, offset, compression_codec, @@ -2078,7 +2081,7 @@ fn write_array_data( let buffer = buffer.bit_slice(array_data.offset(), array_data.len()); offset = encode_sink_buffer( buffer, - buffers, + meta, sink, offset, compression_codec, @@ -2101,7 +2104,7 @@ fn write_array_data( }; offset = encode_sink_buffer( offsets, - buffers, + meta, sink, offset, compression_codec, @@ -2110,12 +2113,9 @@ fn write_array_data( )?; offset = write_array_data( &sliced_child_data, - buffers, + meta, sink, - nodes, offset, - sliced_child_data.len(), - sliced_child_data.null_count(), compression_codec, compression_context, write_options, @@ -2136,7 +2136,7 @@ fn write_array_data( offset = encode_sink_buffer( offsets, - buffers, + meta, sink, offset, compression_codec, @@ -2145,7 +2145,7 @@ fn write_array_data( )?; offset = encode_sink_buffer( sizes, - buffers, + meta, sink, offset, compression_codec, @@ -2155,12 +2155,9 @@ fn write_array_data( offset = write_array_data( &child_data, - buffers, + meta, sink, - nodes, offset, - child_data.len(), - child_data.null_count(), compression_codec, compression_context, write_options, @@ -2176,12 +2173,9 @@ fn write_array_data( offset = write_array_data( &child_data, - buffers, + meta, sink, - nodes, offset, - child_data.len(), - child_data.null_count(), compression_codec, compression_context, write_options, @@ -2191,7 +2185,7 @@ fn write_array_data( for buffer in array_data.buffers() { offset = encode_sink_buffer( buffer.clone(), - buffers, + meta, sink, offset, compression_codec, @@ -2211,12 +2205,9 @@ fn write_array_data( // write the nested data (e.g list data) offset = write_array_data( data_ref, - buffers, + meta, sink, - nodes, offset, - data_ref.len(), - data_ref.null_count(), compression_codec, compression_context, write_options, @@ -2229,12 +2220,9 @@ fn write_array_data( // write the nested data (e.g list data) offset = write_array_data( data_ref, - buffers, + meta, sink, - nodes, offset, - data_ref.len(), - data_ref.null_count(), compression_codec, compression_context, write_options, @@ -2260,7 +2248,7 @@ fn write_array_data( /// Returns the updated `offset` (advanced by the encoded length plus any alignment padding). fn encode_sink_buffer( buffer: Buffer, - buffers: &mut Vec, + ipc_meta_data: &mut IpcMetadataBuilder, sink: &mut IpcBodySink<'_>, offset: i64, compression_codec: Option, @@ -2284,7 +2272,7 @@ fn encode_sink_buffer( let pad_len = pad_to_alignment(alignment, len as usize); sink.write(pad_len, encoded); - buffers.push(crate::Buffer::new(offset, len)); + ipc_meta_data.buffers.push(crate::Buffer::new(offset, len)); Ok(offset + len + pad_len as i64) }