Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/python/lance/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class CompactionOptions(TypedDict):
The batch size to use when scanning input fragments. You may want
to reduce this if you are running out of memory during compaction.

The default will use the same default from ``scanner``.
"""
io_buffer_size: Optional[int]
"""
The number of bytes to allow to queue up in the I/O buffer when scanning
input fragments. Increasing this can avoid a deadlock that occurs when a
single batch of data is larger than the I/O buffer size.

The default will use the same default from ``scanner``.
"""
compaction_mode: Optional[
Expand Down
3 changes: 3 additions & 0 deletions python/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ fn parse_compaction_options(
"batch_size" => {
opts.batch_size = value.extract()?;
}
"io_buffer_size" => {
opts.io_buffer_size = value.extract()?;
}
"compaction_mode" => {
let mode_str: Option<String> = value.extract()?;
if let Some(mode_str) = mode_str {
Expand Down
80 changes: 80 additions & 0 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ pub struct CompactionOptions {
/// specified then the default (see
/// [`crate::dataset::Scanner::batch_size`]) will be used.
pub batch_size: Option<usize>,
/// The number of bytes to allow to queue up in the I/O buffer when scanning
/// the input fragments. If not specified then the default (see
/// [`crate::dataset::Scanner::io_buffer_size`]) will be used.
///
/// Increasing this can avoid a deadlock that occurs when a single batch of
/// data is larger than the I/O buffer size.
pub io_buffer_size: Option<u64>,
/// Whether to defer remapping indices during compaction. If true, indices will
/// not be remapped during this compaction operation. Instead, the fragment reuse index
/// is updated and will be used to perform remapping later.
Expand Down Expand Up @@ -237,6 +244,7 @@ impl Default for CompactionOptions {
num_threads: None,
max_bytes_per_file: None,
batch_size: None,
io_buffer_size: None,
defer_index_remap: false,
compaction_mode: None,
enable_binary_copy: false,
Expand Down Expand Up @@ -264,6 +272,7 @@ impl CompactionOptions {
/// - `lance.compaction.materialize_deletions_threshold`
/// - `lance.compaction.defer_index_remap`
/// - `lance.compaction.batch_size`
/// - `lance.compaction.io_buffer_size`
/// - `lance.compaction.compaction_mode`
/// - `lance.compaction.binary_copy_read_batch_bytes`
/// - `lance.compaction.max_source_fragments`
Expand Down Expand Up @@ -347,6 +356,14 @@ impl CompactionOptions {
))
})?);
}
"io_buffer_size" => {
self.io_buffer_size = Some(value.parse().map_err(|_| {
Error::invalid_input(format!(
"Invalid value for {}: '{}' (expected a non-negative integer)",
key, value
))
})?);
}
"compaction_mode" => {
self.compaction_mode = Some(CompactionMode::try_from(value.as_str())?);
}
Expand Down Expand Up @@ -1194,6 +1211,8 @@ async fn transform_blob_v2_batch(
/// and preserve insertion order.
/// - `batch_size`: Optional batch size; if provided, set it on the scanner to control
/// read batching.
/// - `io_buffer_size`: Optional I/O buffer size in bytes; if provided, set it on the
/// scanner to control how much data is queued during reads.
/// - `with_frags`: Whether to scan only the specified old fragments and force
/// in-order reading.
/// - `capture_row_ids`: When index remapping is needed, include and capture the
Expand All @@ -1209,6 +1228,7 @@ async fn prepare_reader(
dataset: &Dataset,
fragments: &[Fragment],
batch_size: Option<usize>,
io_buffer_size: Option<u64>,
with_frags: bool,
capture_row_ids: bool,
) -> Result<(
Expand All @@ -1234,6 +1254,9 @@ async fn prepare_reader(
if let Some(bs) = batch_size {
scanner.batch_size(bs);
}
if let Some(io_buffer_size) = io_buffer_size {
scanner.io_buffer_size(io_buffer_size);
}
if with_frags {
scanner
.with_fragments(fragments.to_vec())
Expand Down Expand Up @@ -1515,6 +1538,7 @@ async fn rewrite_files(
dataset.as_ref(),
&fragments,
options.batch_size,
options.io_buffer_size,
true,
needs_remapping,
)
Expand Down Expand Up @@ -2636,6 +2660,57 @@ mod tests {
assert_eq!(scanned_data, data);
}

#[rstest]
#[tokio::test]
async fn test_compact_with_io_buffer_size(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
// Compaction should succeed and produce correct results when an
// explicit io_buffer_size is provided via CompactionOptions.
let test_dir = TempStrDir::default();
let test_uri = &test_dir;

let data = sample_data();

// Create a table with 2 small fragments so there is something to compact.
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema());
let write_params = WriteParams {
max_rows_per_file: 5_000,
max_rows_per_group: 1_000,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let mut dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 2);

let options = CompactionOptions {
// A generous buffer so the read does not deadlock on large batches.
io_buffer_size: Some(256 * 1024 * 1024),
..Default::default()
};
let plan = plan_compaction(&dataset, &options).await.unwrap();
assert_eq!(plan.tasks().len(), 1);

let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert_eq!(metrics.fragments_removed, 2);
assert_eq!(metrics.fragments_added, 1);

// All rows are preserved after compaction.
let scanner = dataset.scan();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let scanned_data = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(scanned_data.num_rows(), data.num_rows());
}

#[rstest]
#[tokio::test]
async fn test_compact_deletions(
Expand Down Expand Up @@ -4683,6 +4758,10 @@ mod tests {
"lance.compaction.batch_size".to_string(),
"4096".to_string(),
),
(
"lance.compaction.io_buffer_size".to_string(),
"1073741824".to_string(),
),
(
"lance.compaction.compaction_mode".to_string(),
"try_binary_copy".to_string(),
Expand All @@ -4701,6 +4780,7 @@ mod tests {
assert!((opts.materialize_deletions_threshold - 0.25).abs() < f32::EPSILON);
assert!(opts.defer_index_remap);
assert_eq!(opts.batch_size, Some(4096));
assert_eq!(opts.io_buffer_size, Some(1_073_741_824));
assert_eq!(opts.compaction_mode, Some(CompactionMode::TryBinaryCopy));
assert_eq!(opts.binary_copy_read_batch_bytes, Some(8_388_608));
}
Expand Down
Loading