From 90f5f60f29dd7aa47fb38691f1a93e996262f239 Mon Sep 17 00:00:00 2001 From: Nikolay Skovorodin Date: Thu, 11 Jun 2026 19:04:50 +0700 Subject: [PATCH] fix: support scans of file-less projected fragments Trust modern Fragment.physical_rows before rejecting fragments with no data files, allowing all-null projected columns to synthesize rows after dropping the only physical column. Update fragment validation to use physical_rows for row-bearing fragments without visible data files and cover scan/append regressions. --- rust/lance/src/dataset/fragment.rs | 82 +++++++------ rust/lance/src/dataset/schema_evolution.rs | 130 ++++++++++++++++++++- 2 files changed, 176 insertions(+), 36 deletions(-) diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 11851e8846e..80b92abadcc 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1209,16 +1209,9 @@ impl FileFragment { /// If there are no deleted rows, this is equal to the number of rows in the /// fragment. pub async fn physical_rows(&self) -> Result { - if self.metadata.files.is_empty() { - return Err(Error::not_found(format!( - "Fragment {} does not contain any data", - self.id() - ))); - }; - // Early versions that did not write the writer version also could write - // incorrect `physical_row` values. So if we don't have a writer version, - // we should not used the cached value. On write, we update the values + // incorrect `physical_rows` values. So if we don't have a writer version, + // we should not use the cached value. On write, we update the values // in the manifest, fixing the issue for future reads. // See: https://github.com/lance-format/lance/issues/1531 if self.dataset.manifest.writer_version.is_some() @@ -1227,6 +1220,13 @@ impl FileFragment { return Ok(physical_rows); } + if self.metadata.files.is_empty() { + return Err(Error::not_found(format!( + "Fragment {} does not contain any data", + self.id() + ))); + }; + // Just open any file. All of them should have same size. let some_file = &self.metadata.files[0]; let reader = self @@ -1283,9 +1283,9 @@ impl FileFragment { } } - if self.metadata.files.iter().any(|f| f.is_legacy_file()) - != self.metadata.files.iter().all(|f| f.is_legacy_file()) - { + let has_legacy_files = self.metadata.files.iter().any(|f| f.is_legacy_file()); + let has_non_legacy_files = self.metadata.files.iter().any(|f| !f.is_legacy_file()); + if has_legacy_files && has_non_legacy_files { return Err(Error::corrupt_file( self.dataset .data_file_dir(&self.metadata.files[0])? @@ -1318,35 +1318,49 @@ impl FileFragment { let (get_lengths, deletion_vector) = join!(get_lengths, deletion_vector); let get_lengths = get_lengths?; - let expected_length = get_lengths.first().unwrap_or(&0); - for (length, data_file) in get_lengths.iter().zip(self.metadata.files.iter()) { - if length != expected_length { + let expected_length = if let Some(first_length) = get_lengths.first() { + for (length, data_file) in get_lengths.iter().zip(self.metadata.files.iter()) { + if length != first_length { + let path = self + .dataset + .data_file_dir(data_file)? + .join(data_file.path.as_str()); + return Err(Error::corrupt_file( + path, + format!( + "data file has incorrect length. Expected: {} Got: {}", + first_length, length + ), + )); + } + } + if let Some(physical_rows) = self.metadata.physical_rows + && physical_rows != *first_length + { let path = self .dataset - .data_file_dir(data_file)? - .join(data_file.path.as_str()); + .data_file_dir(&self.metadata.files[0])? + .join(self.metadata.files[0].path.as_str()); return Err(Error::corrupt_file( path, format!( - "data file has incorrect length. Expected: {} Got: {}", - expected_length, length + "Fragment metadata has incorrect physical_rows. Actual: {} Metadata: {}", + first_length, physical_rows ), )); } - } - if let Some(physical_rows) = self.metadata.physical_rows - && physical_rows != *expected_length - { - return Err(Error::corrupt_file( - self.dataset - .data_file_dir(&self.metadata.files[0])? - .join(self.metadata.files[0].path.as_str()), - format!( - "Fragment metadata has incorrect physical_rows. Actual: {} Metadata: {}", - expected_length, physical_rows - ), - )); - } + *first_length + } else { + self.metadata.physical_rows.ok_or_else(|| { + Error::corrupt_file( + self.dataset.base.clone(), + format!( + "Fragment {} does not contain any data files and is missing physical_rows", + self.id() + ), + ) + })? + }; if let Some(deletion_vector) = deletion_vector? { if let Some(num_deletions) = self @@ -1372,7 +1386,7 @@ impl FileFragment { } for offset in deletion_vector.iter() { - if offset >= *expected_length as u32 { + if offset >= expected_length as u32 { let deletion_file_meta = self.metadata.deletion_file.as_ref().unwrap(); return Err(Error::corrupt_file( deletion_file_path( diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..6b73d4c91a6 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -765,9 +765,10 @@ mod test { use std::collections::HashMap; use std::sync::Mutex; - use crate::dataset::WriteParams; + use crate::dataset::{WriteMode, WriteParams}; use arrow_array::{ - ArrayRef, Int32Array, ListArray, RecordBatchIterator, StringArray, StructArray, + Array, ArrayRef, Int32Array, Int64Array, ListArray, RecordBatchIterator, StringArray, + StructArray, }; use super::*; @@ -1220,6 +1221,131 @@ mod test { Ok(()) } + async fn dataset_with_only_all_null_column(test_uri: &str, num_rows: usize) -> Result { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int64, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from_iter_values( + (1..=num_rows).map(|value| value as i64), + ))], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Stable), + ..Default::default() + }), + ) + .await?; + + dataset + .add_columns( + NewColumnTransform::AllNulls(Arc::new(ArrowSchema::new(vec![ArrowField::new( + "x", + DataType::Int64, + true, + )]))), + None, + None, + ) + .await?; + dataset.drop_columns(&["id"]).await?; + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert!(fragments[0].metadata.files.is_empty()); + assert_eq!(fragments[0].metadata.physical_rows, Some(num_rows)); + dataset.validate().await?; + + Ok(dataset) + } + + #[tokio::test] + async fn test_scan_all_null_column_after_dropping_only_physical_column() -> Result<()> { + let num_rows = 9; + let test_dir = TempStrDir::default(); + let dataset = dataset_with_only_all_null_column(&test_dir, num_rows).await?; + + let data = dataset.scan().try_into_batch().await?; + + let expected_schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Int64, true)]); + assert_eq!(data.schema().as_ref(), &expected_schema); + assert_eq!(data.num_rows(), num_rows); + + let x = data["x"].as_any().downcast_ref::().unwrap(); + assert_eq!(x.len(), num_rows); + assert_eq!(x.null_count(), num_rows); + + Ok(()) + } + + #[tokio::test] + async fn test_append_readded_column_after_dropping_only_physical_column() -> Result<()> { + let num_existing_rows = 9; + let test_dir = TempStrDir::default(); + let mut dataset = dataset_with_only_all_null_column(&test_dir, num_existing_rows).await?; + + dataset + .add_columns( + NewColumnTransform::AllNulls(Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int64, + true, + )]))), + None, + None, + ) + .await?; + + let append_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("x", DataType::Int64, true), + ArrowField::new("id", DataType::Int64, true), + ])); + let append_batch = RecordBatch::try_new( + append_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![None::])), + Arc::new(Int64Array::from(vec![Some(42_i64)])), + ], + )?; + dataset + .append( + RecordBatchIterator::new(vec![Ok(append_batch)], append_schema), + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await?; + + let data = dataset.scan().try_into_batch().await?; + + let expected_schema = ArrowSchema::new(vec![ + ArrowField::new("x", DataType::Int64, true), + ArrowField::new("id", DataType::Int64, true), + ]); + assert_eq!(data.schema().as_ref(), &expected_schema); + assert_eq!(data.num_rows(), num_existing_rows + 1); + + let x = data["x"].as_any().downcast_ref::().unwrap(); + assert_eq!(x.len(), num_existing_rows + 1); + assert_eq!(x.null_count(), num_existing_rows + 1); + + let id = data["id"].as_any().downcast_ref::().unwrap(); + assert_eq!(id.len(), num_existing_rows + 1); + for row_idx in 0..num_existing_rows { + assert!(id.is_null(row_idx)); + } + assert_eq!(id.value(num_existing_rows), 42); + + Ok(()) + } + #[tokio::test] async fn test_add_column_all_nulls_legacy() -> Result<()> { let num_rows = 100;