Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow_schema::ArrowError;

const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;
const DEFAULT_ZSTD_COMPRESSION_LEVEL: i32 = 3;

/// Additional context that may be needed for compression.
///
Expand All @@ -35,10 +36,9 @@ pub struct CompressionContext {

impl CompressionContext {
#[cfg(feature = "zstd")]
fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
fn zstd_compressor(&mut self, level: i32) -> &mut zstd::bulk::Compressor<'static> {
self.compressor.get_or_insert_with(|| {
zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL)
.expect("can use default compression level")
zstd::bulk::Compressor::new(level).expect("can use default compression level")
})
}
}
Expand Down Expand Up @@ -111,15 +111,15 @@ impl std::fmt::Debug for DecompressionContext {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {
Lz4Frame,
Zstd,
Zstd(i32),
}

impl TryFrom<CompressionType> for CompressionCodec {
type Error = ArrowError;

fn try_from(compression_type: CompressionType) -> Result<Self, ArrowError> {
match compression_type {
CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
CompressionType::ZSTD => Ok(CompressionCodec::Zstd(DEFAULT_ZSTD_COMPRESSION_LEVEL)),
CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
other_type => Err(ArrowError::NotYetImplemented(format!(
"compression type {other_type:?} not supported "
Expand All @@ -129,6 +129,25 @@ impl TryFrom<CompressionType> for CompressionCodec {
}

impl CompressionCodec {
/// Creates a [`CompressionCodec`] with an explicit compression level.
///
/// The level is used for [`CompressionType::ZSTD`].
/// [`CompressionType::LZ4_FRAME`] does not yet support compression levels
/// and ignores this value. Returns an error for unsupported compression
/// types.
pub fn try_new_with_compression_level(
compression_type: CompressionType,
compression_level: i32,
) -> Result<Self, ArrowError> {
match compression_type {
CompressionType::ZSTD => Ok(CompressionCodec::Zstd(compression_level)),
CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
other_type => Err(ArrowError::NotYetImplemented(format!(
"compression type {other_type:?} not supported "
))),
}
}

/// Compresses the data in `input` to `output` and appends the
/// data using the specified compression mechanism.
///
Expand Down Expand Up @@ -213,7 +232,7 @@ impl CompressionCodec {
) -> Result<(), ArrowError> {
match self {
CompressionCodec::Lz4Frame => compress_lz4(input, output),
CompressionCodec::Zstd => compress_zstd(input, output, context),
CompressionCodec::Zstd(level) => compress_zstd(input, output, context, *level),
}
}

Expand All @@ -227,7 +246,7 @@ impl CompressionCodec {
) -> Result<Vec<u8>, ArrowError> {
let ret = match self {
CompressionCodec::Lz4Frame => decompress_lz4(input, decompressed_size)?,
CompressionCodec::Zstd => decompress_zstd(input, decompressed_size, context)?,
CompressionCodec::Zstd(_) => decompress_zstd(input, decompressed_size, context)?,
};
if ret.len() != decompressed_size {
return Err(ArrowError::IpcError(format!(
Expand Down Expand Up @@ -279,8 +298,9 @@ fn compress_zstd(
input: &[u8],
output: &mut Vec<u8>,
context: &mut CompressionContext,
level: i32,
) -> Result<(), ArrowError> {
let result = context.zstd_compressor().compress(input)?;
let result = context.zstd_compressor(level).compress(input)?;
output.extend_from_slice(&result);
Ok(())
}
Expand All @@ -291,6 +311,7 @@ fn compress_zstd(
_input: &[u8],
_output: &mut Vec<u8>,
_context: &mut CompressionContext,
_level: i32,
) -> Result<(), ArrowError> {
Err(ArrowError::InvalidArgumentError(
"zstd IPC compression requires the zstd feature".to_string(),
Expand Down Expand Up @@ -363,7 +384,7 @@ mod tests {
#[cfg(feature = "zstd")]
fn test_zstd_compression() {
let input_bytes = b"hello zstd";
let codec = super::CompressionCodec::Zstd;
let codec = super::CompressionCodec::Zstd(super::DEFAULT_ZSTD_COMPRESSION_LEVEL);
let mut output_bytes: Vec<u8> = Vec::new();
codec
.compress(input_bytes, &mut output_bytes, &mut Default::default())
Expand Down
63 changes: 60 additions & 3 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct IpcWriteOptions {
/// Compression, if desired. Will result in a runtime error
/// if the corresponding feature is not enabled
batch_compression_type: Option<crate::CompressionType>,
// Compression level
batch_compression_level: Option<i32>,
/// How to handle updating dictionaries in IPC messages
dictionary_handling: DictionaryHandling,
}
Expand Down Expand Up @@ -166,6 +168,43 @@ impl IpcWriteOptions {
}
Ok(self)
}

/// Configures the compression level used when writing compressed IPC batches.
///
/// Compression levels require metadata V5 or newer and are currently only
/// supported for ZSTD compression.
pub fn try_with_compression_level(
mut self,
batch_compression_level: Option<i32>,
) -> Result<Self, ArrowError> {
self.batch_compression_level = batch_compression_level;

if self.batch_compression_level.is_some()
&& self.metadata_version < crate::MetadataVersion::V5
{
return Err(ArrowError::InvalidArgumentError(
"Compression only supported in metadata v5 and above".to_string(),
));
}

match (self.batch_compression_type, self.batch_compression_level) {
(Some(crate::CompressionType::ZSTD), Some(level)) if !(-999..=22).contains(&level) => {
return Err(ArrowError::InvalidArgumentError(format!(
"ZSTD compression level must be between -999 and 22, got {level}"
)));
}
(Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
return Err(ArrowError::InvalidArgumentError(
"LZ4 Frame compression does not support configurable compression levels"
.to_string(),
));
}
_ => {}
}

Ok(self)
}

/// Try to create IpcWriteOptions, checking for incompatible settings
pub fn try_new(
alignment: usize,
Expand All @@ -192,6 +231,7 @@ impl IpcWriteOptions {
write_legacy_ipc_format,
metadata_version,
batch_compression_type: None,
batch_compression_level: None,
dictionary_handling: DictionaryHandling::default(),
}),
crate::MetadataVersion::V5 => {
Expand All @@ -205,6 +245,7 @@ impl IpcWriteOptions {
write_legacy_ipc_format,
metadata_version,
batch_compression_type: None,
batch_compression_level: None,
dictionary_handling: DictionaryHandling::default(),
})
}
Expand All @@ -229,6 +270,7 @@ impl Default for IpcWriteOptions {
write_legacy_ipc_format: false,
metadata_version: crate::MetadataVersion::V5,
batch_compression_type: None,
batch_compression_level: None,
dictionary_handling: DictionaryHandling::default(),
}
}
Expand Down Expand Up @@ -703,8 +745,15 @@ impl IpcDataGenerator {
c.finish()
});

let compression_codec: Option<CompressionCodec> =
batch_compression_type.map(TryInto::try_into).transpose()?;
let batch_compression_level = write_options.batch_compression_level;
let compression_codec: Option<CompressionCodec> = batch_compression_type
.map(|compression_type| match batch_compression_level {
Some(level) => {
CompressionCodec::try_new_with_compression_level(compression_type, level)
}
None => compression_type.try_into(),
})
.transpose()?;

let alignment = write_options.alignment;
let mut variadic_buffer_counts = vec![];
Expand Down Expand Up @@ -785,8 +834,14 @@ impl IpcDataGenerator {
c.finish()
});

let batch_compression_level = write_options.batch_compression_level;
let compression_codec: Option<CompressionCodec> = batch_compression_type
.map(|batch_compression_type| batch_compression_type.try_into())
.map(|batch_compression_type| match batch_compression_level {
Some(level) => {
CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
}
None => batch_compression_type.try_into(),
})
.transpose()?;

let alignment = write_options.alignment;
Expand Down Expand Up @@ -2495,6 +2550,8 @@ mod tests {
let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
.unwrap()
.try_with_compression(Some(crate::CompressionType::ZSTD))
.unwrap()
.try_with_compression_level(Some(1))
.unwrap();

let mut writer =
Expand Down
Loading