diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..c976f8d4aec 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -496,6 +496,17 @@ async fn add_columns_from_stream( debug_assert_eq!(batch.num_columns(), 1); let mut rows_remaining = batch.num_rows(); + // The updater yields an empty batch when every row in a read batch + // has been deleted (e.g. a whole batch falls within the deletion + // vector). There is nothing to pull from the stream in that case, so + // feed an empty batch back to keep the updater in sync and continue. + if rows_remaining == 0 { + updater + .update(RecordBatch::new_empty(stream.schema())) + .await?; + continue; + } + let mut batches = Vec::new(); while rows_remaining > 0 { @@ -864,6 +875,69 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_add_columns_with_fully_deleted_batch() -> Result<()> { + // Regression test: when an entire read batch has been deleted, the + // updater yields a 0-row batch. The inner loop then never runs and + // `batches` stays empty, so `concat_batches(&batches[0]..)` used to + // panic with "index out of bounds: the len is 0 but the index is 0". + // + // A single fragment holds 105 rows; deleting the trailing 5 rows means + // that, when read with batch_size=50, the third batch [100..105) is + // fully filtered out and produces an empty batch. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..105))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 200, // keep all rows in a single fragment + ..Default::default() + }), + ) + .await?; + + // Delete the entire trailing batch [100..105). + dataset.delete("i >= 100").await?; + assert_eq!(dataset.count_rows(None).await?, 100); + + let new_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "j", + DataType::Int32, + false, + )])); + let new_batch = RecordBatch::try_new( + new_schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..100))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(new_batch)], new_schema.clone()); + + // Read with batch_size=50 so the deleted trailing rows form a full empty batch. + dataset + .add_columns(NewColumnTransform::Reader(Box::new(reader)), None, Some(50)) + .await?; + + let data = dataset.scan().try_into_batch().await?; + assert_eq!(data.num_rows(), 100); + assert_eq!( + data.column_by_name("j").unwrap().as_ref(), + &Int32Array::from_iter_values(0..100) + ); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_append_columns_udf(