Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions arrow-flight/benches/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ use tonic::{

pub type Builder = fn(usize) -> ArrayRef;

pub const TYPES: &[(&str, Builder)] = &[
("fixed", fixed),
("nested", nested),
("variable", variable),
("dict", dict),
];
pub const TYPES: &[(&str, Builder)] =
&[("fixed", fixed), ("nested", nested), ("variable", variable)];

pub const DICT_TYPES: &[(&str, Builder)] = &[("dict", dict)];

fn fixed(n: usize) -> ArrayRef {
Arc::new(Int64Array::from_iter_values(0..n as i64))
Expand Down
61 changes: 57 additions & 4 deletions arrow-flight/benches/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
// under the License.

use arrow_array::RecordBatch;
use arrow_flight::{FlightClient, FlightData, encode::FlightDataEncoderBuilder};
use arrow_flight::{
FlightClient, FlightData,
encode::{DictionaryHandling, FlightDataEncoderBuilder},
};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use futures::TryStreamExt;
use tonic::transport::Channel;

mod common;
use common::{TYPES, build_batch, start_server};
use common::{DICT_TYPES, TYPES, build_batch, start_server};

const ROWS: [usize; 2] = [8 * 1024, 64 * 1024];
const COLS: [usize; 2] = [1, 8];
const COLS: [usize; 3] = [1, 4, 8];

fn bench_encode(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down Expand Up @@ -83,5 +86,55 @@ fn bench_roundtrip(c: &mut Criterion) {
}
}

criterion_group!(benches, bench_encode, bench_roundtrip);
fn bench_do_put_dictionary(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (channel, _) = rt.block_on(start_server());
let mut g = c.benchmark_group("do_put_dictionary");

for &(name, build) in DICT_TYPES {
for &rows in &ROWS {
for &cols in &COLS {
let batch = build_batch(name, rows, cols, build);
g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64));

for (label, handling) in [
("hydrate", DictionaryHandling::Hydrate),
("resend", DictionaryHandling::Resend),
] {
let frames: Vec<FlightData> = rt
.block_on(
FlightDataEncoderBuilder::new()
.with_dictionary_handling(handling)
.build(futures::stream::iter([Ok(batch.clone())]))
.try_collect(),
)
.unwrap();
let id = BenchmarkId::new(format!("{name}/{label}"), format!("{rows}x{cols}"));
g.bench_function(id, |b| {
b.to_async(&rt).iter_batched(
|| (FlightClient::new(channel.clone()), frames.clone()),
|(mut client, frames)| async move {
client
.do_put(futures::stream::iter(frames.into_iter().map(Ok)))
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}
}
}
}

criterion_group!(
benches,
bench_encode,
bench_roundtrip,
bench_do_put_dictionary
);
criterion_main!(benches);
53 changes: 28 additions & 25 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,20 +329,14 @@ impl FlightDataEncoder {
}

/// Place the `FlightData` in the queue to send
#[inline]
fn queue_message(&mut self, mut data: FlightData) {
if let Some(descriptor) = self.descriptor.take() {
data.flight_descriptor = Some(descriptor);
}
self.queue.push_back(data);
}

/// Place the `FlightData` in the queue to send
fn queue_messages(&mut self, datas: impl IntoIterator<Item = FlightData>) {
for data in datas {
self.queue_message(data)
}
}

/// Encodes schema as a [`FlightData`] in self.queue.
/// Updates `self.schema` and returns the new schema
fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
Expand Down Expand Up @@ -381,8 +375,9 @@ impl FlightDataEncoder {

for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) = self.encoder.encode_batch(&batch)?;

self.queue_messages(flight_dictionaries);
for dict in flight_dictionaries {
self.queue_message(dict);
}
self.queue_message(flight_batch);
}

Expand Down Expand Up @@ -671,7 +666,7 @@ fn prepare_schema_for_flight(
fn split_batch_for_grpc_response(
batch: RecordBatch,
max_flight_data_size: usize,
) -> Vec<RecordBatch> {
) -> impl Iterator<Item = RecordBatch> {
let size = batch
.columns()
.iter()
Expand All @@ -680,18 +675,20 @@ fn split_batch_for_grpc_response(

let n_batches =
(size / max_flight_data_size + usize::from(size % max_flight_data_size != 0)).max(1);
let rows_per_batch = (batch.num_rows() / n_batches).max(1);
let mut out = Vec::with_capacity(n_batches + 1);

let num_rows = batch.num_rows();
let rows_per_batch = (num_rows / n_batches).max(1);
let mut offset = 0;
while offset < batch.num_rows() {
let length = (rows_per_batch).min(batch.num_rows() - offset);
out.push(batch.slice(offset, length));

offset += length;
}

out
std::iter::from_fn(move || {
if offset < num_rows {
let length = rows_per_batch.min(num_rows - offset);
let slice = batch.slice(offset, length);
offset += length;
Some(slice)
} else {
None
}
})
}

/// The data needed to encode a stream of flight data, holding on to
Expand Down Expand Up @@ -724,15 +721,18 @@ impl FlightIpcEncoder {

/// Convert a `RecordBatch` to a Vec of `FlightData` representing
/// dictionaries and a `FlightData` representing the batch
fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec<FlightData>, FlightData)> {
fn encode_batch(
&mut self,
batch: &RecordBatch,
) -> Result<(impl Iterator<Item = FlightData> + use<>, FlightData)> {
let (encoded_dictionaries, encoded_batch) = self.data_gen.encode(
batch,
&mut self.dictionary_tracker,
&self.options,
&mut self.compression_context,
)?;

let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
let flight_dictionaries = encoded_dictionaries.into_iter().map(|e| e.into());
let flight_batch = encoded_batch.into();

Ok((flight_dictionaries, flight_batch))
Expand Down Expand Up @@ -1838,7 +1838,8 @@ mod tests {
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
let split: Vec<_> =
split_batch_for_grpc_response(batch.clone(), max_flight_data_size).collect();
assert_eq!(split.len(), 1);
assert_eq!(batch, split[0]);

Expand All @@ -1848,7 +1849,8 @@ mod tests {
let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as u8).collect::<Vec<_>>());
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as ArrayRef)])
.expect("cannot create record batch");
let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size);
let split: Vec<_> =
split_batch_for_grpc_response(batch.clone(), max_flight_data_size).collect();
assert_eq!(split.len(), 3);
assert_eq!(
split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
Expand Down Expand Up @@ -1892,7 +1894,8 @@ mod tests {

let input_rows = batch.num_rows();

let split = split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes);
let split: Vec<_> =
split_batch_for_grpc_response(batch.clone(), max_flight_data_size_bytes).collect();
let sizes: Vec<_> = split.iter().map(RecordBatch::num_rows).collect();
let output_rows: usize = sizes.iter().sum();

Expand Down
Loading