Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-root

[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
# Enable the IPC compression codecs so tests can exercise compressed Flight encoding
arrow-ipc = { workspace = true, features = ["lz4", "zstd"] }
assert_cmd = "2.0.8"
criterion = { workspace = true, default-features = false, features = ["async_tokio"] }
http = "1.1.0"
Expand Down
36 changes: 33 additions & 3 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ mod tests {
use arrow_array::{cast::downcast_array, types::*};
use arrow_buffer::ScalarBuffer;
use arrow_cast::pretty::pretty_format_batches;
use arrow_ipc::MetadataVersion;
use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_schema::{UnionFields, UnionMode};
use builder::{GenericStringBuilder, MapBuilder};
use std::collections::HashMap;
Expand Down Expand Up @@ -893,6 +893,27 @@ mod tests {
verify_flight_round_trip(vec![batch1, batch2]).await;
}

#[tokio::test]
async fn test_compression_round_trip() {
// Round trip a batch through Flight with IPC body compression enabled. This exercises
// the compressed `IpcDataGenerator::encode` path (per-buffer codec output), which the
// uncompressed Flight tests and the writer-based compression tests do not cover.
let ints = Int32Array::from_iter_values((0..1024).map(|i| i % 8));
let strings = StringArray::from_iter_values((0..1024).map(|i| format!("value-{}", i % 8)));
let batch = RecordBatch::try_from_iter(vec![
("ints", Arc::new(ints) as ArrayRef),
("strings", Arc::new(strings) as ArrayRef),
])
.unwrap();

for compression in [CompressionType::LZ4_FRAME, CompressionType::ZSTD] {
let options = IpcWriteOptions::default()
.try_with_compression(Some(compression))
.unwrap();
verify_flight_round_trip_with_options(vec![batch.clone()], options).await;
}
}

#[tokio::test]
async fn test_dictionary_hydration_known_schema() {
let arr1: DictionaryArray<UInt16Type> = vec!["a", "a", "b"].into_iter().collect();
Expand Down Expand Up @@ -1752,11 +1773,20 @@ mod tests {
verify_flight_round_trip(vec![batch1, batch2]).await;
}

async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
async fn verify_flight_round_trip(batches: Vec<RecordBatch>) {
verify_flight_round_trip_with_options(batches, IpcWriteOptions::default()).await;
}

/// Encode `batches` through a [`FlightDataEncoderBuilder`] using `options`, decode them
/// again, and assert the decoded batches match the originals.
async fn verify_flight_round_trip_with_options(
mut batches: Vec<RecordBatch>,
options: IpcWriteOptions,
) {
let expected_schema = batches.first().unwrap().schema();

let encoder = FlightDataEncoderBuilder::default()
.with_options(IpcWriteOptions::default())
.with_options(options)
.with_dictionary_handling(DictionaryHandling::Resend)
.build(futures::stream::iter(batches.clone().into_iter().map(Ok)));

Expand Down
Loading