diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index ff6e83dfdd0b..f126d5cc9900 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -21,6 +21,7 @@ use arrow_schema::ArrowError; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; +const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3; /// Additional context that may be needed for compression. /// @@ -35,10 +36,9 @@ pub struct CompressionContext { impl CompressionContext { #[cfg(feature = "zstd")] - fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> { + fn zstd_compressor(&mut self, level: i32) -> &mut zstd::bulk::Compressor<'static> { self.compressor.get_or_insert_with(|| { - zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL) - .expect("can use default compression level") + zstd::bulk::Compressor::new(level).expect("can use default compression level") }) } } @@ -111,7 +111,7 @@ impl std::fmt::Debug for DecompressionContext { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CompressionCodec { Lz4Frame, - Zstd, + Zstd(i32), } impl TryFrom for CompressionCodec { @@ -119,7 +119,7 @@ impl TryFrom for CompressionCodec { fn try_from(compression_type: CompressionType) -> Result { match compression_type { - CompressionType::ZSTD => Ok(CompressionCodec::Zstd), + CompressionType::ZSTD => Ok(CompressionCodec::Zstd(DEFAULT_ZSTD_COMPRESSION_LEVEL)), CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame), other_type => Err(ArrowError::NotYetImplemented(format!( "compression type {other_type:?} not supported " @@ -129,6 +129,25 @@ impl TryFrom for CompressionCodec { } impl CompressionCodec { + /// Creates a [`CompressionCodec`] with an explicit compression level. + /// + /// The level is used for [`CompressionType::ZSTD`]. + /// [`CompressionType::LZ4_FRAME`] does not yet support compression levels + /// and ignores this value. Returns an error for unsupported compression + /// types. + pub fn try_new_with_compression_level( + compression_type: CompressionType, + compression_level: i32, + ) -> Result { + match compression_type { + CompressionType::ZSTD => Ok(CompressionCodec::Zstd(compression_level)), + CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame), + other_type => Err(ArrowError::NotYetImplemented(format!( + "compression type {other_type:?} not supported " + ))), + } + } + /// Compresses the data in `input` to `output` and appends the /// data using the specified compression mechanism. /// @@ -213,7 +232,7 @@ impl CompressionCodec { ) -> Result<(), ArrowError> { match self { CompressionCodec::Lz4Frame => compress_lz4(input, output), - CompressionCodec::Zstd => compress_zstd(input, output, context), + CompressionCodec::Zstd(level) => compress_zstd(input, output, context, *level), } } @@ -227,7 +246,7 @@ impl CompressionCodec { ) -> Result, ArrowError> { let ret = match self { CompressionCodec::Lz4Frame => decompress_lz4(input, decompressed_size)?, - CompressionCodec::Zstd => decompress_zstd(input, decompressed_size, context)?, + CompressionCodec::Zstd(_) => decompress_zstd(input, decompressed_size, context)?, }; if ret.len() != decompressed_size { return Err(ArrowError::IpcError(format!( @@ -279,8 +298,9 @@ fn compress_zstd( input: &[u8], output: &mut Vec, context: &mut CompressionContext, + level: i32, ) -> Result<(), ArrowError> { - let result = context.zstd_compressor().compress(input)?; + let result = context.zstd_compressor(level).compress(input)?; output.extend_from_slice(&result); Ok(()) } @@ -291,6 +311,7 @@ fn compress_zstd( _input: &[u8], _output: &mut Vec, _context: &mut CompressionContext, + _level: i32, ) -> Result<(), ArrowError> { Err(ArrowError::InvalidArgumentError( "zstd IPC compression requires the zstd feature".to_string(), @@ -363,7 +384,7 @@ mod tests { #[cfg(feature = "zstd")] fn test_zstd_compression() { let input_bytes = b"hello zstd"; - let codec = super::CompressionCodec::Zstd; + let codec = super::CompressionCodec::Zstd(super::DEFAULT_ZSTD_COMPRESSION_LEVEL); let mut output_bytes: Vec = Vec::new(); codec .compress(input_bytes, &mut output_bytes, &mut Default::default()) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4142858ce80c..bba941bdd791 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -66,6 +66,8 @@ pub struct IpcWriteOptions { /// Compression, if desired. Will result in a runtime error /// if the corresponding feature is not enabled batch_compression_type: Option, + // Compression level + batch_compression_level: Option, /// How to handle updating dictionaries in IPC messages dictionary_handling: DictionaryHandling, } @@ -166,6 +168,43 @@ impl IpcWriteOptions { } Ok(self) } + + /// Configures the compression level used when writing compressed IPC batches. + /// + /// Compression levels require metadata V5 or newer and are currently only + /// supported for ZSTD compression. + pub fn try_with_compression_level( + mut self, + batch_compression_level: Option, + ) -> Result { + self.batch_compression_level = batch_compression_level; + + if self.batch_compression_level.is_some() + && self.metadata_version < crate::MetadataVersion::V5 + { + return Err(ArrowError::InvalidArgumentError( + "Compression only supported in metadata v5 and above".to_string(), + )); + } + + match (self.batch_compression_type, self.batch_compression_level) { + (Some(crate::CompressionType::ZSTD), Some(level)) if !(-999..=22).contains(&level) => { + return Err(ArrowError::InvalidArgumentError(format!( + "ZSTD compression level must be between -999 and 22, got {level}" + ))); + } + (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => { + return Err(ArrowError::InvalidArgumentError( + "LZ4 Frame compression does not support configurable compression levels" + .to_string(), + )); + } + _ => {} + } + + Ok(self) + } + /// Try to create IpcWriteOptions, checking for incompatible settings pub fn try_new( alignment: usize, @@ -192,6 +231,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, + batch_compression_level: None, dictionary_handling: DictionaryHandling::default(), }), crate::MetadataVersion::V5 => { @@ -205,6 +245,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, + batch_compression_level: None, dictionary_handling: DictionaryHandling::default(), }) } @@ -229,6 +270,7 @@ impl Default for IpcWriteOptions { write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, batch_compression_type: None, + batch_compression_level: None, dictionary_handling: DictionaryHandling::default(), } } @@ -703,8 +745,15 @@ impl IpcDataGenerator { c.finish() }); - let compression_codec: Option = - batch_compression_type.map(TryInto::try_into).transpose()?; + let batch_compression_level = write_options.batch_compression_level; + let compression_codec: Option = batch_compression_type + .map(|compression_type| match batch_compression_level { + Some(level) => { + CompressionCodec::try_new_with_compression_level(compression_type, level) + } + None => compression_type.try_into(), + }) + .transpose()?; let alignment = write_options.alignment; let mut variadic_buffer_counts = vec![]; @@ -785,8 +834,14 @@ impl IpcDataGenerator { c.finish() }); + let batch_compression_level = write_options.batch_compression_level; let compression_codec: Option = batch_compression_type - .map(|batch_compression_type| batch_compression_type.try_into()) + .map(|batch_compression_type| match batch_compression_level { + Some(level) => { + CompressionCodec::try_new_with_compression_level(batch_compression_type, level) + } + None => batch_compression_type.try_into(), + }) .transpose()?; let alignment = write_options.alignment; @@ -2495,6 +2550,8 @@ mod tests { let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5) .unwrap() .try_with_compression(Some(crate::CompressionType::ZSTD)) + .unwrap() + .try_with_compression_level(Some(1)) .unwrap(); let mut writer =