Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions rust/lance/src/dataset/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,17 @@ async fn add_columns_from_stream(
debug_assert_eq!(batch.num_columns(), 1);
let mut rows_remaining = batch.num_rows();

// The updater yields an empty batch when every row in a read batch
// has been deleted (e.g. a whole batch falls within the deletion
// vector). There is nothing to pull from the stream in that case, so
// feed an empty batch back to keep the updater in sync and continue.
if rows_remaining == 0 {
updater
.update(RecordBatch::new_empty(stream.schema()))
.await?;
continue;
}

let mut batches = Vec::new();

while rows_remaining > 0 {
Expand Down Expand Up @@ -864,6 +875,69 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_add_columns_with_fully_deleted_batch() -> Result<()> {
// Regression test: when an entire read batch has been deleted, the
// updater yields a 0-row batch. The inner loop then never runs and
// `batches` stays empty, so `concat_batches(&batches[0]..)` used to
// panic with "index out of bounds: the len is 0 but the index is 0".
//
// A single fragment holds 105 rows; deleting the trailing 5 rows means
// that, when read with batch_size=50, the third batch [100..105) is
// fully filtered out and produces an empty batch.
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..105))],
)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());

let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
max_rows_per_file: 200, // keep all rows in a single fragment
..Default::default()
}),
)
.await?;

// Delete the entire trailing batch [100..105).
dataset.delete("i >= 100").await?;
assert_eq!(dataset.count_rows(None).await?, 100);

let new_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"j",
DataType::Int32,
false,
)]));
let new_batch = RecordBatch::try_new(
new_schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..100))],
)?;
let reader = RecordBatchIterator::new(vec![Ok(new_batch)], new_schema.clone());

// Read with batch_size=50 so the deleted trailing rows form a full empty batch.
dataset
.add_columns(NewColumnTransform::Reader(Box::new(reader)), None, Some(50))
.await?;

let data = dataset.scan().try_into_batch().await?;
assert_eq!(data.num_rows(), 100);
assert_eq!(
data.column_by_name("j").unwrap().as_ref(),
&Int32Array::from_iter_values(0..100)
);

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_append_columns_udf(
Expand Down
Loading