From 456fbd10dd6cc9acf1e6c1a9cd2c850fa8f1725e Mon Sep 17 00:00:00 2001 From: aimanmalib <84276911+aimanmalib@users.noreply.github.com> Date: Thu, 11 Jun 2026 04:55:30 +0000 Subject: [PATCH] feat: expose io_buffer_size in CompactionOptions Compaction builds its scan reader via prepare_reader, which previously only forwarded batch_size to the scanner. The scanner's io_buffer_size knob was never set during compaction, so users had no way to increase the I/O buffer. This matters because a single batch larger than the I/O buffer size causes the scanner to deadlock (documented in Scanner::io_buffer_size), and with backpressure warnings downgraded to debug this deadlock is silent at the default. Add an io_buffer_size field to CompactionOptions, plumb it through prepare_reader to scanner.io_buffer_size, and support the lance.compaction.io_buffer_size manifest config key. The Python binding (parse_compaction_options + CompactionOptions TypedDict) is updated to keep parameter names consistent across languages. Closes #4946 --- python/python/lance/optimize.py | 8 +++ python/src/dataset/optimize.rs | 3 ++ rust/lance/src/dataset/optimize.rs | 80 ++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/python/python/lance/optimize.py b/python/python/lance/optimize.py index 8b98308d442..3ac7547960b 100644 --- a/python/python/lance/optimize.py +++ b/python/python/lance/optimize.py @@ -57,6 +57,14 @@ class CompactionOptions(TypedDict): The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction. + The default will use the same default from ``scanner``. + """ + io_buffer_size: Optional[int] + """ + The number of bytes to allow to queue up in the I/O buffer when scanning + input fragments. Increasing this can avoid a deadlock that occurs when a + single batch of data is larger than the I/O buffer size. + The default will use the same default from ``scanner``. """ compaction_mode: Optional[ diff --git a/python/src/dataset/optimize.rs b/python/src/dataset/optimize.rs index 321d7157b86..33aa32b94cd 100644 --- a/python/src/dataset/optimize.rs +++ b/python/src/dataset/optimize.rs @@ -58,6 +58,9 @@ fn parse_compaction_options( "batch_size" => { opts.batch_size = value.extract()?; } + "io_buffer_size" => { + opts.io_buffer_size = value.extract()?; + } "compaction_mode" => { let mode_str: Option = value.extract()?; if let Some(mode_str) = mode_str { diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index d591e42cc73..56cf74c1a62 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -191,6 +191,13 @@ pub struct CompactionOptions { /// specified then the default (see /// [`crate::dataset::Scanner::batch_size`]) will be used. pub batch_size: Option, + /// The number of bytes to allow to queue up in the I/O buffer when scanning + /// the input fragments. If not specified then the default (see + /// [`crate::dataset::Scanner::io_buffer_size`]) will be used. + /// + /// Increasing this can avoid a deadlock that occurs when a single batch of + /// data is larger than the I/O buffer size. + pub io_buffer_size: Option, /// Whether to defer remapping indices during compaction. If true, indices will /// not be remapped during this compaction operation. Instead, the fragment reuse index /// is updated and will be used to perform remapping later. @@ -237,6 +244,7 @@ impl Default for CompactionOptions { num_threads: None, max_bytes_per_file: None, batch_size: None, + io_buffer_size: None, defer_index_remap: false, compaction_mode: None, enable_binary_copy: false, @@ -264,6 +272,7 @@ impl CompactionOptions { /// - `lance.compaction.materialize_deletions_threshold` /// - `lance.compaction.defer_index_remap` /// - `lance.compaction.batch_size` + /// - `lance.compaction.io_buffer_size` /// - `lance.compaction.compaction_mode` /// - `lance.compaction.binary_copy_read_batch_bytes` /// - `lance.compaction.max_source_fragments` @@ -347,6 +356,14 @@ impl CompactionOptions { )) })?); } + "io_buffer_size" => { + self.io_buffer_size = Some(value.parse().map_err(|_| { + Error::invalid_input(format!( + "Invalid value for {}: '{}' (expected a non-negative integer)", + key, value + )) + })?); + } "compaction_mode" => { self.compaction_mode = Some(CompactionMode::try_from(value.as_str())?); } @@ -1194,6 +1211,8 @@ async fn transform_blob_v2_batch( /// and preserve insertion order. /// - `batch_size`: Optional batch size; if provided, set it on the scanner to control /// read batching. +/// - `io_buffer_size`: Optional I/O buffer size in bytes; if provided, set it on the +/// scanner to control how much data is queued during reads. /// - `with_frags`: Whether to scan only the specified old fragments and force /// in-order reading. /// - `capture_row_ids`: When index remapping is needed, include and capture the @@ -1209,6 +1228,7 @@ async fn prepare_reader( dataset: &Dataset, fragments: &[Fragment], batch_size: Option, + io_buffer_size: Option, with_frags: bool, capture_row_ids: bool, ) -> Result<( @@ -1234,6 +1254,9 @@ async fn prepare_reader( if let Some(bs) = batch_size { scanner.batch_size(bs); } + if let Some(io_buffer_size) = io_buffer_size { + scanner.io_buffer_size(io_buffer_size); + } if with_frags { scanner .with_fragments(fragments.to_vec()) @@ -1515,6 +1538,7 @@ async fn rewrite_files( dataset.as_ref(), &fragments, options.batch_size, + options.io_buffer_size, true, needs_remapping, ) @@ -2636,6 +2660,57 @@ mod tests { assert_eq!(scanned_data, data); } + #[rstest] + #[tokio::test] + async fn test_compact_with_io_buffer_size( + #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)] + data_storage_version: LanceFileVersion, + ) { + // Compaction should succeed and produce correct results when an + // explicit io_buffer_size is provided via CompactionOptions. + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + + let data = sample_data(); + + // Create a table with 2 small fragments so there is something to compact. + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 5_000, + max_rows_per_group: 1_000, + data_storage_version: Some(data_storage_version), + ..Default::default() + }; + let mut dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 2); + + let options = CompactionOptions { + // A generous buffer so the read does not deadlock on large batches. + io_buffer_size: Some(256 * 1024 * 1024), + ..Default::default() + }; + let plan = plan_compaction(&dataset, &options).await.unwrap(); + assert_eq!(plan.tasks().len(), 1); + + let metrics = compact_files(&mut dataset, options, None).await.unwrap(); + assert_eq!(metrics.fragments_removed, 2); + assert_eq!(metrics.fragments_added, 1); + + // All rows are preserved after compaction. + let scanner = dataset.scan(); + let batches = scanner + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let scanned_data = concat_batches(&batches[0].schema(), &batches).unwrap(); + assert_eq!(scanned_data.num_rows(), data.num_rows()); + } + #[rstest] #[tokio::test] async fn test_compact_deletions( @@ -4683,6 +4758,10 @@ mod tests { "lance.compaction.batch_size".to_string(), "4096".to_string(), ), + ( + "lance.compaction.io_buffer_size".to_string(), + "1073741824".to_string(), + ), ( "lance.compaction.compaction_mode".to_string(), "try_binary_copy".to_string(), @@ -4701,6 +4780,7 @@ mod tests { assert!((opts.materialize_deletions_threshold - 0.25).abs() < f32::EPSILON); assert!(opts.defer_index_remap); assert_eq!(opts.batch_size, Some(4096)); + assert_eq!(opts.io_buffer_size, Some(1_073_741_824)); assert_eq!(opts.compaction_mode, Some(CompactionMode::TryBinaryCopy)); assert_eq!(opts.binary_copy_read_batch_bytes, Some(8_388_608)); }