From 6384d1b1a25099fe6e3ac9bcb769d184226f2aac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9D=A8?= Date: Thu, 11 Jun 2026 08:03:04 +0000 Subject: [PATCH] fix: handle empty batch from deletion in add_columns_from_stream When an entire read batch has been deleted (e.g., all rows in a batch fall within the deletion vector), the updater yields a 0-row batch. The old code would then skip the inner loop, leaving `batches` empty, causing `concat_batches(&batches[0]..)` to panic with 'index out of bounds: the len is 0 but the index is 0'. This fix detects the empty batch case and feeds an empty batch back to the updater to keep it in sync, then continues to the next batch. Adds a regression test that creates a dataset with 105 rows, deletes the trailing 5 rows (forming a fully-deleted trailing batch when read with batch_size=50), and verifies add_columns succeeds. Fixes the panic reported in production when using merge_columns on a dataset with deletion files where deleted rows cluster at batch boundaries. --- rust/lance/src/dataset/schema_evolution.rs | 74 ++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..c976f8d4aec 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -496,6 +496,17 @@ async fn add_columns_from_stream( debug_assert_eq!(batch.num_columns(), 1); let mut rows_remaining = batch.num_rows(); + // The updater yields an empty batch when every row in a read batch + // has been deleted (e.g. a whole batch falls within the deletion + // vector). There is nothing to pull from the stream in that case, so + // feed an empty batch back to keep the updater in sync and continue. + if rows_remaining == 0 { + updater + .update(RecordBatch::new_empty(stream.schema())) + .await?; + continue; + } + let mut batches = Vec::new(); while rows_remaining > 0 { @@ -864,6 +875,69 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_add_columns_with_fully_deleted_batch() -> Result<()> { + // Regression test: when an entire read batch has been deleted, the + // updater yields a 0-row batch. The inner loop then never runs and + // `batches` stays empty, so `concat_batches(&batches[0]..)` used to + // panic with "index out of bounds: the len is 0 but the index is 0". + // + // A single fragment holds 105 rows; deleting the trailing 5 rows means + // that, when read with batch_size=50, the third batch [100..105) is + // fully filtered out and produces an empty batch. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..105))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 200, // keep all rows in a single fragment + ..Default::default() + }), + ) + .await?; + + // Delete the entire trailing batch [100..105). + dataset.delete("i >= 100").await?; + assert_eq!(dataset.count_rows(None).await?, 100); + + let new_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "j", + DataType::Int32, + false, + )])); + let new_batch = RecordBatch::try_new( + new_schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..100))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(new_batch)], new_schema.clone()); + + // Read with batch_size=50 so the deleted trailing rows form a full empty batch. + dataset + .add_columns(NewColumnTransform::Reader(Box::new(reader)), None, Some(50)) + .await?; + + let data = dataset.scan().try_into_batch().await?; + assert_eq!(data.num_rows(), 100); + assert_eq!( + data.column_by_name("j").unwrap().as_ref(), + &Int32Array::from_iter_values(0..100) + ); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_append_columns_udf(