From d02e29748e7d3502108d7fa92390c2b0c0d72d98 Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 11 Jun 2026 23:58:45 -0400 Subject: [PATCH 1/6] Minor optimizations to arrow-flight --- arrow-flight/src/encode.rs | 53 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 191da024136f..f5961ddb95a7 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -329,6 +329,7 @@ impl FlightDataEncoder { } /// Place the `FlightData` in the queue to send + #[inline] fn queue_message(&mut self, mut data: FlightData) { if let Some(descriptor) = self.descriptor.take() { data.flight_descriptor = Some(descriptor); @@ -336,13 +337,6 @@ impl FlightDataEncoder { self.queue.push_back(data); } - /// Place the `FlightData` in the queue to send - fn queue_messages(&mut self, datas: impl IntoIterator) { - for data in datas { - self.queue_message(data) - } - } - /// Encodes schema as a [`FlightData`] in self.queue. /// Updates `self.schema` and returns the new schema fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef { @@ -381,8 +375,9 @@ impl FlightDataEncoder { for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) { let (flight_dictionaries, flight_batch) = self.encoder.encode_batch(&batch)?; - - self.queue_messages(flight_dictionaries); + for dict in flight_dictionaries { + self.queue_message(dict); + } self.queue_message(flight_batch); } @@ -671,7 +666,7 @@ fn prepare_schema_for_flight( fn split_batch_for_grpc_response( batch: RecordBatch, max_flight_data_size: usize, -) -> Vec { +) -> impl Iterator { let size = batch .columns() .iter() @@ -680,18 +675,20 @@ fn split_batch_for_grpc_response( let n_batches = (size / max_flight_data_size + usize::from(size % max_flight_data_size != 0)).max(1); - let rows_per_batch = (batch.num_rows() / n_batches).max(1); - let mut out = Vec::with_capacity(n_batches + 1); - + let num_rows = batch.num_rows(); + let rows_per_batch = (num_rows / n_batches).max(1); let mut offset = 0; - while offset < batch.num_rows() { - let length = (rows_per_batch).min(batch.num_rows() - offset); - out.push(batch.slice(offset, length)); - offset += length; - } - - out + std::iter::from_fn(move || { + if offset < num_rows { + let length = rows_per_batch.min(num_rows - offset); + let slice = batch.slice(offset, length); + offset += length; + Some(slice) + } else { + None + } + }) } /// The data needed to encode a stream of flight data, holding on to @@ -724,7 +721,10 @@ impl FlightIpcEncoder { /// Convert a `RecordBatch` to a Vec of `FlightData` representing /// dictionaries and a `FlightData` representing the batch - fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec, FlightData)> { + fn encode_batch( + &mut self, + batch: &RecordBatch, + ) -> Result<(impl Iterator + use<>, FlightData)> { let (encoded_dictionaries, encoded_batch) = self.data_gen.encode( batch, &mut self.dictionary_tracker, @@ -732,7 +732,7 @@ impl FlightIpcEncoder { &mut self.compression_context, )?; - let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); + let flight_dictionaries = encoded_dictionaries.into_iter().map(|e| e.into()); let flight_batch = encoded_batch.into(); Ok((flight_dictionaries, flight_batch)) @@ -1838,7 +1838,8 @@ mod tests { let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) .expect("cannot create record batch"); - let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); + let split: Vec<_> = + split_batch_for_grpc_response(batch.clone(), max_flight_data_size).collect(); assert_eq!(split.len(), 1); assert_eq!(batch, split[0]); @@ -1848,7 +1849,8 @@ mod tests { let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::>()); let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)]) .expect("cannot create record batch"); - let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size); + let split: Vec<_> = + split_batch_for_grpc_response(batch.clone(), max_flight_data_size).collect(); assert_eq!(split.len(), 3); assert_eq!( split.iter().map(|batch| batch.num_rows()).sum::(), @@ -1892,7 +1894,8 @@ mod tests { let input_rows = batch.num_rows(); - let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes); + let split: Vec<_> = + split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes).collect(); let sizes: Vec<_> = split.iter().map(RecordBatch::num_rows).collect(); let output_rows: usize = sizes.iter().sum(); From 403004c08ec1ebeb83cd4009e7052868944e2005 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 12 Jun 2026 10:40:46 -0400 Subject: [PATCH 2/6] avoid re-allocations for uncompressed path --- arrow-ipc/src/writer.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4142858ce80c..4c31f51748c5 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -556,7 +556,15 @@ impl IpcDataGenerator { write_options, compression_context, )?; - let mut arrow_data = Vec::new(); + // by default, write_options.compression is None, this means no compression will be applied and the encoded buffers will just be references to the original array buffers, so we can calculate the total size of the body data by summing the sizes of the original buffers. + // avoiding the intermediate allocations & copys to new buffers. + let mut arrow_data = Vec::with_capacity( + batch + .columns() + .iter() + .map(|c| c.get_array_memory_size()) + .sum::(), + ); let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( batch, write_options, From 3159ad395e6c5d40bd1a4f3f8a07a87fded7213c Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 12 Jun 2026 11:42:31 -0400 Subject: [PATCH 3/6] re-use flatbuffer allocations across calls --- arrow-flight/src/encode.rs | 8 +-- arrow-flight/src/utils.rs | 4 +- arrow-ipc/src/compression.rs | 18 +++--- arrow-ipc/src/writer.rs | 112 +++++++++++++++++++---------------- 4 files changed, 77 insertions(+), 65 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index f5961ddb95a7..7ed57fd3f486 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; use crate::{FlightData, FlightDescriptor, SchemaAsIpc, error::Result}; use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray}; -use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteContext, IpcWriteOptions}; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode}; use bytes::Bytes; @@ -701,7 +701,7 @@ struct FlightIpcEncoder { options: IpcWriteOptions, data_gen: IpcDataGenerator, dictionary_tracker: DictionaryTracker, - compression_context: CompressionContext, + compression_context: IpcWriteContext, } impl FlightIpcEncoder { @@ -710,7 +710,7 @@ impl FlightIpcEncoder { options, data_gen: IpcDataGenerator::default(), dictionary_tracker: DictionaryTracker::new(error_on_replacement), - compression_context: CompressionContext::default(), + compression_context: IpcWriteContext::default(), } } @@ -1813,7 +1813,7 @@ mod tests { ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen .encode( diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 6effb5f86aaf..796688100797 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_ipc::convert::fb_to_schema; -use arrow_ipc::writer::CompressionContext; +use arrow_ipc::writer::IpcWriteContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; use arrow_schema::{ArrowError, Schema, SchemaRef}; @@ -92,7 +92,7 @@ pub fn batches_to_flight_data( let data_gen = writer::IpcDataGenerator::default(); let mut dictionary_tracker = writer::DictionaryTracker::new(false); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); for batch in batches.iter() { let (encoded_dictionaries, encoded_batch) = data_gen.encode( diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index ff6e83dfdd0b..5e91ee23e7c2 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -18,6 +18,7 @@ use crate::CompressionType; use arrow_buffer::Buffer; use arrow_schema::ArrowError; +use flatbuffers::FlatBufferBuilder; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; @@ -28,12 +29,13 @@ const LENGTH_OF_PREFIX_DATA: i64 = 8; /// compression calls to avoid the performance overhead of initialising a new context for every /// compression. #[derive(Default)] -pub struct CompressionContext { +pub struct IpcWriteContext { #[cfg(feature = "zstd")] compressor: Option>, + pub(crate) fbb: FlatBufferBuilder<'static>, } -impl CompressionContext { +impl IpcWriteContext { #[cfg(feature = "zstd")] fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> { self.compressor.get_or_insert_with(|| { @@ -43,9 +45,9 @@ impl CompressionContext { } } -impl std::fmt::Debug for CompressionContext { +impl std::fmt::Debug for IpcWriteContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("CompressionContext"); + let mut ds = f.debug_struct("IpcWriteContext"); #[cfg(feature = "zstd")] ds.field( @@ -143,7 +145,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, ) -> Result { let uncompressed_data_len = input.len(); let original_output_len = output.len(); @@ -209,7 +211,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match self { CompressionCodec::Lz4Frame => compress_lz4(input, output), @@ -278,7 +280,7 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result, A fn compress_zstd( input: &[u8], output: &mut Vec, - context: &mut CompressionContext, + context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { let result = context.zstd_compressor().compress(input)?; output.extend_from_slice(&result); @@ -290,7 +292,7 @@ fn compress_zstd( fn compress_zstd( _input: &[u8], _output: &mut Vec, - _context: &mut CompressionContext, + _context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { Err(ArrowError::InvalidArgumentError( "zstd IPC compression requires the zstd feature".to_string(), diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4c31f51748c5..f79f21e7b445 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -43,7 +43,7 @@ use arrow_schema::*; use crate::CONTINUATION_MARKER; use crate::compression::CompressionCodec; -pub use crate::compression::CompressionContext; +pub use crate::compression::IpcWriteContext; use crate::convert::IpcSchemaEncoder; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] @@ -244,7 +244,7 @@ impl Default for IpcWriteOptions { /// # use std::sync::Arc; /// # use arrow_array::UInt64Array; /// # use arrow_array::RecordBatch; -/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +/// # use arrow_ipc::writer::{IpcWriteContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; /// /// // Create a record batch /// let batch = RecordBatch::try_from_iter(vec![ @@ -256,7 +256,7 @@ impl Default for IpcWriteOptions { /// let options = IpcWriteOptions::default(); /// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); /// -/// let mut compression_context = CompressionContext::default(); +/// let mut compression_context = IpcWriteContext::default(); /// /// // encode the batch into zero or more encoded dictionaries /// // and the data for the actual array. @@ -310,7 +310,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { @@ -471,7 +471,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -548,7 +548,7 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result<(Vec, EncodedData), ArrowError> { let encoded_dictionaries = self.encode_all_dicts( batch, @@ -558,13 +558,13 @@ impl IpcDataGenerator { )?; // by default, write_options.compression is None, this means no compression will be applied and the encoded buffers will just be references to the original array buffers, so we can calculate the total size of the body data by summing the sizes of the original buffers. // avoiding the intermediate allocations & copys to new buffers. - let mut arrow_data = Vec::with_capacity( - batch - .columns() - .iter() - .map(|c| c.get_array_memory_size()) - .sum::(), - ); + let size = batch + .columns() + .iter() + .map(|c| c.get_array_memory_size()) + .sum::(); + println!("Total size of body data: {size}"); + let mut arrow_data = Vec::with_capacity(size); let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( batch, write_options, @@ -572,6 +572,12 @@ impl IpcDataGenerator { &mut IpcBodySink::Write(&mut arrow_data), )?; arrow_data.extend_from_slice(&PADDING[..tail_pad]); + println!( + "size of record batch buffers:{}\nsize of flatbuffer data :{}\nsize of array_data:{}", + size, + ipc_message.len(), + arrow_data.len() + ); Ok(( encoded_dictionaries, EncodedData { @@ -587,7 +593,7 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result, ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); @@ -614,7 +620,7 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, writer: &mut W, ) -> Result { let encoded_dictionaries = self.encode_all_dicts( @@ -697,20 +703,11 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, sink: &mut IpcBodySink<'_>, ) -> Result<(Vec, usize, usize), ArrowError> { - let mut fbb = FlatBufferBuilder::new(); - let batch_compression_type = write_options.batch_compression_type; - let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); - c.add_method(crate::BodyCompressionMethod::BUFFER); - c.add_codec(batch_compression_type); - c.finish() - }); - let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; @@ -736,6 +733,16 @@ impl IpcDataGenerator { let tail_pad = pad_to_alignment(alignment, offset as usize); let body_len = offset as usize + tail_pad; + // Build flatbuffer header using the reused builder in the context. + let fbb = &mut compression_context.fbb; + + let compression = batch_compression_type.map(|batch_compression_type| { + let mut c = crate::BodyCompressionBuilder::new(fbb); + c.add_method(crate::BodyCompressionMethod::BUFFER); + c.add_codec(batch_compression_type); + c.finish() + }); + let buffers = fbb.create_vector(&meta.buffers); let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { @@ -745,7 +752,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -758,7 +765,7 @@ impl IpcDataGenerator { batch_builder.finish().as_union_value() }; // create an crate::Message - let mut message = crate::MessageBuilder::new(&mut fbb); + let mut message = crate::MessageBuilder::new(fbb); message.add_version(write_options.metadata_version); message.add_header_type(crate::MessageHeader::RecordBatch); message.add_bodyLength(body_len as i64); @@ -766,7 +773,9 @@ impl IpcDataGenerator { let root = message.finish(); fbb.finish(root, None); - Ok((fbb.finished_data().to_vec(), body_len, tail_pad)) + let ipc_message = fbb.finished_data().to_vec(); + fbb.reset(); + Ok((ipc_message, body_len, tail_pad)) } /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the @@ -777,22 +786,13 @@ impl IpcDataGenerator { array_data: &ArrayData, write_options: &IpcWriteOptions, is_delta: bool, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result { - let mut fbb = FlatBufferBuilder::new(); - let mut arrow_data: Vec = vec![]; // get the type of compression let batch_compression_type = write_options.batch_compression_type; - let compression = batch_compression_type.map(|batch_compression_type| { - let mut c = crate::BodyCompressionBuilder::new(&mut fbb); - c.add_method(crate::BodyCompressionMethod::BUFFER); - c.add_codec(batch_compression_type); - c.finish() - }); - let compression_codec: Option = batch_compression_type .map(|batch_compression_type| batch_compression_type.try_into()) .transpose()?; @@ -818,7 +818,16 @@ impl IpcDataGenerator { let body_len = offset as usize + tail_pad; arrow_data.extend_from_slice(&PADDING[..tail_pad]); - // write data + // Build flatbuffer header using the reused builder in the context. + let fbb = &mut compression_context.fbb; + + let compression = batch_compression_type.map(|batch_compression_type| { + let mut c = crate::BodyCompressionBuilder::new(fbb); + c.add_method(crate::BodyCompressionMethod::BUFFER); + c.add_codec(batch_compression_type); + c.finish() + }); + let buffers = fbb.create_vector(&meta.buffers); let nodes = fbb.create_vector(&meta.nodes); let variadic_buffer = if variadic_buffer_counts.is_empty() { @@ -828,7 +837,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::RecordBatchBuilder::new(fbb); batch_builder.add_length(array_data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); @@ -842,7 +851,7 @@ impl IpcDataGenerator { }; let root = { - let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb); + let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb); batch_builder.add_id(dict_id); batch_builder.add_data(root); batch_builder.add_isDelta(is_delta); @@ -850,7 +859,7 @@ impl IpcDataGenerator { }; let root = { - let mut message_builder = crate::MessageBuilder::new(&mut fbb); + let mut message_builder = crate::MessageBuilder::new(fbb); message_builder.add_version(write_options.metadata_version); message_builder.add_header_type(crate::MessageHeader::DictionaryBatch); message_builder.add_bodyLength(body_len as i64); @@ -859,10 +868,11 @@ impl IpcDataGenerator { }; fbb.finish(root, None); - let finished_data = fbb.finished_data(); + let ipc_message = fbb.finished_data().to_vec(); + fbb.reset(); Ok(EncodedData { - ipc_message: finished_data.to_vec(), + ipc_message, arrow_data, }) } @@ -1246,7 +1256,7 @@ pub struct FileWriter { data_gen: IpcDataGenerator, - compression_context: CompressionContext, + compression_context: IpcWriteContext, } impl FileWriter> { @@ -1308,7 +1318,7 @@ impl FileWriter { dictionary_tracker, custom_metadata: HashMap::new(), data_gen, - compression_context: CompressionContext::default(), + compression_context: IpcWriteContext::default(), }) } @@ -1537,7 +1547,7 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, - compression_context: CompressionContext, + compression_context: IpcWriteContext, } impl StreamWriter> { @@ -1588,7 +1598,7 @@ impl StreamWriter { finished: false, dictionary_tracker, data_gen, - compression_context: CompressionContext::default(), + compression_context: IpcWriteContext::default(), }) } @@ -1965,7 +1975,7 @@ fn write_array_data( sink: &mut IpcBodySink<'_>, offset: i64, compression_codec: Option, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, write_options: &IpcWriteOptions, ) -> Result { let mut offset = offset; @@ -2260,7 +2270,7 @@ fn encode_sink_buffer( sink: &mut IpcBodySink<'_>, offset: i64, compression_codec: Option, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, alignment: u8, ) -> Result { let (encoded, len) = match compression_codec { @@ -4454,7 +4464,7 @@ mod tests { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); let writer_options = IpcWriteOptions::default(); - let mut compression_ctx = CompressionContext::default(); + let mut compression_ctx = IpcWriteContext::default(); let schema = Arc::new(Schema::new(vec![Field::new( "a", From 094579bfd86d6123810f1c38462b46db25ce9134 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 12 Jun 2026 13:55:52 -0400 Subject: [PATCH 4/6] introduce buffer pool to avoid repeatily allocating 2/4/12 MB's of memory to fill array data --- arrow-flight/src/encode.rs | 76 ++++++++++++++++++- .../integration_test.rs | 6 +- arrow-ipc/src/compression.rs | 10 +-- arrow-ipc/src/writer.rs | 16 +--- 4 files changed, 83 insertions(+), 25 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 7ed57fd3f486..1badd7c44c59 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -15,7 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; +use std::{ + collections::VecDeque, + fmt::Debug, + pin::Pin, + sync::{Arc, Mutex}, + task::Poll, +}; use crate::{FlightData, FlightDescriptor, SchemaAsIpc, error::Result}; @@ -263,6 +269,60 @@ impl FlightDataEncoderBuilder { } } +const DEFAULT_ARROW_DATA_CAPACITY: usize = GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES; + +/// Pool of reusable `Vec` buffers for the IPC arrow data body. +#[derive(Clone, Debug)] +pub struct ArrowDataPool(Arc>>>); + +impl ArrowDataPool { + /// Create n buffers with pre-allocated capacity for reuse in encoding IPC messages. + pub fn new(n: usize) -> Self { + Self(Arc::new(Mutex::new( + (0..n) + .map(|_| Vec::with_capacity(DEFAULT_ARROW_DATA_CAPACITY)) + .collect(), + ))) + } + + fn acquire(&self) -> Vec { + let mut buf = self.0.lock().unwrap().pop().unwrap_or_default(); + buf.clear(); + buf + } + + fn release(&self, mut buf: Vec) { + buf.clear(); + self.0.lock().unwrap().push(buf); + } +} + +impl Default for ArrowDataPool { + fn default() -> Self { + Self::new(8) + } +} + +// A thin wrapper that gives `Bytes::from_owner` something to hold onto. +// `data` — the Vec written into by encode(). The buffer we keep alive. +// `pool` — shared handle back to the ArrowDataPool; on drop, the Vec finds its way home. +pub(crate) struct PooledBuf { + data: Vec, + pool: ArrowDataPool, +} + +impl AsRef<[u8]> for PooledBuf { + fn as_ref(&self) -> &[u8] { + &self.data + } +} + +impl Drop for PooledBuf { + fn drop(&mut self) { + self.pool.release(std::mem::take(&mut self.data)); + } +} + /// Stream that encodes a stream of record batches to flight data. /// /// See [`FlightDataEncoderBuilder`] for details and example. @@ -702,6 +762,7 @@ struct FlightIpcEncoder { data_gen: IpcDataGenerator, dictionary_tracker: DictionaryTracker, compression_context: IpcWriteContext, + pool: ArrowDataPool, } impl FlightIpcEncoder { @@ -711,6 +772,7 @@ impl FlightIpcEncoder { data_gen: IpcDataGenerator::default(), dictionary_tracker: DictionaryTracker::new(error_on_replacement), compression_context: IpcWriteContext::default(), + pool: ArrowDataPool::default(), } } @@ -725,6 +787,7 @@ impl FlightIpcEncoder { &mut self, batch: &RecordBatch, ) -> Result<(impl Iterator + use<>, FlightData)> { + self.compression_context.scratch = self.pool.acquire(); let (encoded_dictionaries, encoded_batch) = self.data_gen.encode( batch, &mut self.dictionary_tracker, @@ -733,7 +796,16 @@ impl FlightIpcEncoder { )?; let flight_dictionaries = encoded_dictionaries.into_iter().map(|e| e.into()); - let flight_batch = encoded_batch.into(); + + let pooled = PooledBuf { + data: encoded_batch.arrow_data, + pool: self.pool.clone(), + }; + let flight_batch = crate::FlightData { + data_header: encoded_batch.ipc_message.into(), + data_body: Bytes::from_owner(pooled), + ..Default::default() + }; Ok((flight_dictionaries, flight_batch)) } diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index 05ca5627ecd8..3c01410e3873 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -26,7 +26,7 @@ use arrow::{ datatypes::SchemaRef, ipc::{ self, reader, - writer::{self, CompressionContext}, + writer::{self, IpcWriteContext}, }, record_batch::RecordBatch, }; @@ -95,7 +95,7 @@ async fn upload_data( let mut original_data_iter = original_data.iter().enumerate(); - let mut compression_context = CompressionContext::default(); + let mut compression_context = IpcWriteContext::default(); if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); @@ -159,7 +159,7 @@ async fn send_batch( batch: &RecordBatch, options: &writer::IpcWriteOptions, dictionary_tracker: &mut writer::DictionaryTracker, - compression_context: &mut CompressionContext, + compression_context: &mut IpcWriteContext, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 5e91ee23e7c2..2b15d00a0278 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -23,16 +23,16 @@ use flatbuffers::FlatBufferBuilder; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; -/// Additional context that may be needed for compression. -/// -/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent -/// compression calls to avoid the performance overhead of initialising a new context for every -/// compression. +/// - The flatbuffer builder (`fbb`) is reset and reused across calls. +/// - The zstd compressor (when enabled) is kept alive to avoid re-initialisation overhead. #[derive(Default)] pub struct IpcWriteContext { #[cfg(feature = "zstd")] compressor: Option>, pub(crate) fbb: FlatBufferBuilder<'static>, + /// Scratch buffer for the IPC arrow data body. When set by the caller before + /// encode(), the existing allocation is reused instead of creating a fresh Vec. + pub scratch: Vec, } impl IpcWriteContext { diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index f79f21e7b445..a9441b2c9d6d 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -556,15 +556,7 @@ impl IpcDataGenerator { write_options, compression_context, )?; - // by default, write_options.compression is None, this means no compression will be applied and the encoded buffers will just be references to the original array buffers, so we can calculate the total size of the body data by summing the sizes of the original buffers. - // avoiding the intermediate allocations & copys to new buffers. - let size = batch - .columns() - .iter() - .map(|c| c.get_array_memory_size()) - .sum::(); - println!("Total size of body data: {size}"); - let mut arrow_data = Vec::with_capacity(size); + let mut arrow_data = std::mem::take(&mut compression_context.scratch); let (ipc_message, _, tail_pad) = self.record_batch_to_bytes( batch, write_options, @@ -572,12 +564,6 @@ impl IpcDataGenerator { &mut IpcBodySink::Write(&mut arrow_data), )?; arrow_data.extend_from_slice(&PADDING[..tail_pad]); - println!( - "size of record batch buffers:{}\nsize of flatbuffer data :{}\nsize of array_data:{}", - size, - ipc_message.len(), - arrow_data.len() - ); Ok(( encoded_dictionaries, EncodedData { From 2d8d5bb0853589268845756e80f0743346ce4403 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 12 Jun 2026 21:29:53 -0400 Subject: [PATCH 5/6] checkpoint for buffer observibility --- arrow-flight/benches/flight.rs | 2 +- arrow-flight/src/encode.rs | 44 +++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs index 4841e9dd9822..e454a9903d94 100644 --- a/arrow-flight/benches/flight.rs +++ b/arrow-flight/benches/flight.rs @@ -25,7 +25,7 @@ mod common; use common::{TYPES, build_batch, start_server}; const ROWS: [usize; 2] = [8 * 1024, 64 * 1024]; -const COLS: [usize; 2] = [1, 8]; +const COLS: [usize; 4] = [1, 4, 8, 16]; fn bench_encode(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 1badd7c44c59..3541d218a0de 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -273,33 +273,59 @@ const DEFAULT_ARROW_DATA_CAPACITY: usize = GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES; /// Pool of reusable `Vec` buffers for the IPC arrow data body. #[derive(Clone, Debug)] -pub struct ArrowDataPool(Arc>>>); +pub struct ArrowDataPool(Arc>); + +#[derive(Debug)] +struct ArrowDataPoolState { + buffers: Vec>, + in_use: usize, + max_in_use: usize, +} impl ArrowDataPool { /// Create n buffers with pre-allocated capacity for reuse in encoding IPC messages. pub fn new(n: usize) -> Self { - Self(Arc::new(Mutex::new( - (0..n) + Self(Arc::new(Mutex::new(ArrowDataPoolState { + buffers: (0..n) .map(|_| Vec::with_capacity(DEFAULT_ARROW_DATA_CAPACITY)) .collect(), - ))) + in_use: 0, + max_in_use: 0, + }))) } - fn acquire(&self) -> Vec { - let mut buf = self.0.lock().unwrap().pop().unwrap_or_default(); + fn acquire(&mut self) -> Vec { + let mut state = self.0.lock().unwrap(); + let mut buf = match state.buffers.pop() { + Some(buf) => buf, + None => Vec::with_capacity(DEFAULT_ARROW_DATA_CAPACITY), + }; + state.in_use += 1; + state.max_in_use = state.max_in_use.max(state.in_use); buf.clear(); buf } - fn release(&self, mut buf: Vec) { + fn release(&mut self, mut buf: Vec) { buf.clear(); - self.0.lock().unwrap().push(buf); + let mut state = self.0.lock().unwrap(); + state.in_use -= 1; + state.buffers.push(buf); + } +} +impl Drop for ArrowDataPool { + fn drop(&mut self) { + let max_in_use = self.0.lock().unwrap().max_in_use; + println!( + "ArrowDataPool dropped. Max buffers in use at once: {}", + max_in_use + ); } } impl Default for ArrowDataPool { fn default() -> Self { - Self::new(8) + Self::new(1) } } From 505fb203eb0b19b848daac9bf5aa661767302aaa Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 12 Jun 2026 21:47:39 -0400 Subject: [PATCH 6/6] pre-allocate on the fly buffer pools --- arrow-flight/src/encode.rs | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 3541d218a0de..31e4989a1526 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -273,35 +273,24 @@ const DEFAULT_ARROW_DATA_CAPACITY: usize = GRPC_TARGET_MAX_FLIGHT_SIZE_BYTES; /// Pool of reusable `Vec` buffers for the IPC arrow data body. #[derive(Clone, Debug)] -pub struct ArrowDataPool(Arc>); - -#[derive(Debug)] -struct ArrowDataPoolState { - buffers: Vec>, - in_use: usize, - max_in_use: usize, -} +pub struct ArrowDataPool(Arc>>>); impl ArrowDataPool { /// Create n buffers with pre-allocated capacity for reuse in encoding IPC messages. pub fn new(n: usize) -> Self { - Self(Arc::new(Mutex::new(ArrowDataPoolState { - buffers: (0..n) + Self(Arc::new(Mutex::new( + (0..n) .map(|_| Vec::with_capacity(DEFAULT_ARROW_DATA_CAPACITY)) .collect(), - in_use: 0, - max_in_use: 0, - }))) + ))) } fn acquire(&mut self) -> Vec { let mut state = self.0.lock().unwrap(); - let mut buf = match state.buffers.pop() { + let mut buf = match state.pop() { Some(buf) => buf, None => Vec::with_capacity(DEFAULT_ARROW_DATA_CAPACITY), }; - state.in_use += 1; - state.max_in_use = state.max_in_use.max(state.in_use); buf.clear(); buf } @@ -309,17 +298,7 @@ impl ArrowDataPool { fn release(&mut self, mut buf: Vec) { buf.clear(); let mut state = self.0.lock().unwrap(); - state.in_use -= 1; - state.buffers.push(buf); - } -} -impl Drop for ArrowDataPool { - fn drop(&mut self) { - let max_in_use = self.0.lock().unwrap().max_in_use; - println!( - "ArrowDataPool dropped. Max buffers in use at once: {}", - max_in_use - ); + state.push(buf); } }