Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 50 additions & 62 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,20 @@ impl EncodedBuffer {
}
}
}
/// Destination for per-buffer encoded output produced by [`write_array_data`].
/// Accumulates the IPC metadata produced by [`write_array_data`].
///
/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`)
/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
#[derive(Default)]
struct IpcMetadataBuilder {
nodes: Vec<crate::FieldNode>,
buffers: Vec<crate::Buffer>,
}

/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`].
///
/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
/// (offset + length of each buffer in the body); together they form a complete IPC message.
enum IpcBodySink<'a> {
/// Serialize buffer bytes (with padding) into a contiguous byte vec.
Write(&'a mut Vec<u8>),
Expand Down Expand Up @@ -681,9 +694,6 @@ impl IpcDataGenerator {
) -> Result<(Vec<u8>, usize, usize), ArrowError> {
let mut fbb = FlatBufferBuilder::new();

let mut nodes: Vec<crate::FieldNode> = vec![];
let mut buffers: Vec<crate::Buffer> = vec![];

let batch_compression_type = write_options.batch_compression_type;

let compression = batch_compression_type.map(|batch_compression_type| {
Expand All @@ -698,18 +708,16 @@ impl IpcDataGenerator {

let alignment = write_options.alignment;
let mut variadic_buffer_counts = vec![];
let mut meta = IpcMetadataBuilder::default();
let mut offset = 0i64;

for array in batch.columns() {
let array_data = array.to_data();
offset = write_array_data(
&array_data,
&mut buffers,
&mut meta,
sink,
&mut nodes,
offset,
array.len(),
array.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -720,8 +728,8 @@ impl IpcDataGenerator {
let tail_pad = pad_to_alignment(alignment, offset as usize);
let body_len = offset as usize + tail_pad;

let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let buffers = fbb.create_vector(&meta.buffers);
let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
None
} else {
Expand Down Expand Up @@ -765,8 +773,6 @@ impl IpcDataGenerator {
) -> Result<EncodedData, ArrowError> {
let mut fbb = FlatBufferBuilder::new();

let mut nodes: Vec<crate::FieldNode> = vec![];
let mut buffers: Vec<crate::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];

// get the type of compression
Expand All @@ -784,15 +790,13 @@ impl IpcDataGenerator {
.transpose()?;

let alignment = write_options.alignment;
let mut meta = IpcMetadataBuilder::default();
let mut sink = IpcBodySink::Write(&mut arrow_data);
let offset = write_array_data(
array_data,
&mut buffers,
&mut meta,
&mut sink,
&mut nodes,
0,
array_data.len(),
array_data.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -807,8 +811,8 @@ impl IpcDataGenerator {
arrow_data.extend_from_slice(&PADDING[..tail_pad]);

// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let buffers = fbb.create_vector(&meta.buffers);
let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
None
} else {
Expand Down Expand Up @@ -1945,29 +1949,28 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
/// Recursively encodes `array_data` into its IPC representation.
///
/// Output goes to two separate channels:
/// - `buffers` / `nodes`: IPC metadata (offsets, lengths, null counts) that will be
/// serialised into the flatbuffer `RecordBatch` header.
/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header.
/// - `sink`: the raw Arrow data bytes that form the IPC message body.
#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
buffers: &mut Vec<crate::Buffer>,
meta: &mut IpcMetadataBuilder,
sink: &mut IpcBodySink<'_>,
nodes: &mut Vec<crate::FieldNode>,
offset: i64,
num_rows: usize,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a nice cleanup

null_count: usize,
compression_codec: Option<CompressionCodec>,
compression_context: &mut CompressionContext,
write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
let mut offset = offset;
let num_rows = array_data.len();
if !matches!(array_data.data_type(), DataType::Null) {
nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
meta.nodes.push(crate::FieldNode::new(
num_rows as i64,
array_data.null_count() as i64,
));
} else {
// NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
// where null_count is always 0.
nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
// NullArray's null_count equals to len, but ArrayData null_count is always 0.
meta.nodes
.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
}
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
Expand All @@ -1984,7 +1987,7 @@ fn write_array_data(

offset = encode_sink_buffer(
null_buffer,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -1999,7 +2002,7 @@ fn write_array_data(
for buffer in [offsets, values] {
offset = encode_sink_buffer(
buffer,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2017,7 +2020,7 @@ fn write_array_data(
let views = get_or_truncate_buffer(array_data);
offset = encode_sink_buffer(
views,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2028,7 +2031,7 @@ fn write_array_data(
for buffer in array_data.buffers().iter().skip(1) {
offset = encode_sink_buffer(
buffer.clone(),
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2041,7 +2044,7 @@ fn write_array_data(
for buffer in [offsets, values] {
offset = encode_sink_buffer(
buffer,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2062,7 +2065,7 @@ fn write_array_data(
let buffer = get_or_truncate_buffer(array_data);
offset = encode_sink_buffer(
buffer,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2078,7 +2081,7 @@ fn write_array_data(
let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
offset = encode_sink_buffer(
buffer,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2101,7 +2104,7 @@ fn write_array_data(
};
offset = encode_sink_buffer(
offsets,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2110,12 +2113,9 @@ fn write_array_data(
)?;
offset = write_array_data(
&sliced_child_data,
buffers,
meta,
sink,
nodes,
offset,
sliced_child_data.len(),
sliced_child_data.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -2136,7 +2136,7 @@ fn write_array_data(

offset = encode_sink_buffer(
offsets,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2145,7 +2145,7 @@ fn write_array_data(
)?;
offset = encode_sink_buffer(
sizes,
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2155,12 +2155,9 @@ fn write_array_data(

offset = write_array_data(
&child_data,
buffers,
meta,
sink,
nodes,
offset,
child_data.len(),
child_data.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -2176,12 +2173,9 @@ fn write_array_data(

offset = write_array_data(
&child_data,
buffers,
meta,
sink,
nodes,
offset,
child_data.len(),
child_data.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -2191,7 +2185,7 @@ fn write_array_data(
for buffer in array_data.buffers() {
offset = encode_sink_buffer(
buffer.clone(),
buffers,
meta,
sink,
offset,
compression_codec,
Expand All @@ -2211,12 +2205,9 @@ fn write_array_data(
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
buffers,
meta,
sink,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -2229,12 +2220,9 @@ fn write_array_data(
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
buffers,
meta,
sink,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
compression_codec,
compression_context,
write_options,
Expand All @@ -2260,7 +2248,7 @@ fn write_array_data(
/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
fn encode_sink_buffer(
buffer: Buffer,
buffers: &mut Vec<crate::Buffer>,
ipc_meta_data: &mut IpcMetadataBuilder,
sink: &mut IpcBodySink<'_>,
offset: i64,
compression_codec: Option<CompressionCodec>,
Expand All @@ -2284,7 +2272,7 @@ fn encode_sink_buffer(

let pad_len = pad_to_alignment(alignment, len as usize);
sink.write(pad_len, encoded);
buffers.push(crate::Buffer::new(offset, len));
ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
Ok(offset + len + pad_len as i64)
}

Expand Down
Loading