From aceedaafe85847bf106f33e14fcee4176e515fb2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 Jun 2026 11:09:15 -0400 Subject: [PATCH] Add arrow-flight test coverage for IPC compression Round trips a batch through FlightDataEncoderBuilder with LZ4 and ZSTD IPC body compression enabled, exercising the compressed IpcDataGenerator::encode path that no existing arrow-flight or arrow-ipc test covered. Refactors verify_flight_round_trip to share its encode/decode/assert logic via verify_flight_round_trip_with_options, and enables the arrow-ipc lz4/zstd features as arrow-flight dev-dependencies. Co-Authored-By: Claude Opus 4.8 (1M context) --- arrow-flight/Cargo.toml | 2 ++ arrow-flight/src/encode.rs | 36 +++++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 8e399fbc5a52..46fcd0810315 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -75,6 +75,8 @@ cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-root [dev-dependencies] arrow-cast = { workspace = true, features = ["prettyprint"] } +# Enable the IPC compression codecs so tests can exercise compressed Flight encoding +arrow-ipc = { workspace = true, features = ["lz4", "zstd"] } assert_cmd = "2.0.8" criterion = { workspace = true, default-features = false, features = ["async_tokio"] } http = "1.1.0" diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 191da024136f..05c5b9509f52 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -793,7 +793,7 @@ mod tests { use arrow_array::{cast::downcast_array, types::*}; use arrow_buffer::ScalarBuffer; use arrow_cast::pretty::pretty_format_batches; - use arrow_ipc::MetadataVersion; + use arrow_ipc::{CompressionType, MetadataVersion}; use arrow_schema::{UnionFields, UnionMode}; use builder::{GenericStringBuilder, MapBuilder}; use std::collections::HashMap; @@ -893,6 +893,27 @@ mod tests { verify_flight_round_trip(vec![batch1, batch2]).await; } + #[tokio::test] + async fn test_compression_round_trip() { + // Round trip a batch through Flight with IPC body compression enabled. This exercises + // the compressed `IpcDataGenerator::encode` path (per-buffer codec output), which the + // uncompressed Flight tests and the writer-based compression tests do not cover. + let ints = Int32Array::from_iter_values((0..1024).map(|i| i % 8)); + let strings = StringArray::from_iter_values((0..1024).map(|i| format!("value-{}", i % 8))); + let batch = RecordBatch::try_from_iter(vec![ + ("ints", Arc::new(ints) as ArrayRef), + ("strings", Arc::new(strings) as ArrayRef), + ]) + .unwrap(); + + for compression in [CompressionType::LZ4_FRAME, CompressionType::ZSTD] { + let options = IpcWriteOptions::default() + .try_with_compression(Some(compression)) + .unwrap(); + verify_flight_round_trip_with_options(vec![batch.clone()], options).await; + } + } + #[tokio::test] async fn test_dictionary_hydration_known_schema() { let arr1: DictionaryArray = vec!["a", "a", "b"].into_iter().collect(); @@ -1752,11 +1773,20 @@ mod tests { verify_flight_round_trip(vec![batch1, batch2]).await; } - async fn verify_flight_round_trip(mut batches: Vec) { + async fn verify_flight_round_trip(batches: Vec) { + verify_flight_round_trip_with_options(batches, IpcWriteOptions::default()).await; + } + + /// Encode `batches` through a [`FlightDataEncoderBuilder`] using `options`, decode them + /// again, and assert the decoded batches match the originals. + async fn verify_flight_round_trip_with_options( + mut batches: Vec, + options: IpcWriteOptions, + ) { let expected_schema = batches.first().unwrap().schema(); let encoder = FlightDataEncoderBuilder::default() - .with_options(IpcWriteOptions::default()) + .with_options(options) .with_dictionary_handling(DictionaryHandling::Resend) .build(futures::stream::iter(batches.clone().into_iter().map(Ok)));