Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 48 additions & 34 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,16 +1209,9 @@ impl FileFragment {
/// If there are no deleted rows, this is equal to the number of rows in the
/// fragment.
pub async fn physical_rows(&self) -> Result<usize> {
if self.metadata.files.is_empty() {
return Err(Error::not_found(format!(
"Fragment {} does not contain any data",
self.id()
)));
};

// Early versions that did not write the writer version also could write
// incorrect `physical_row` values. So if we don't have a writer version,
// we should not used the cached value. On write, we update the values
// incorrect `physical_rows` values. So if we don't have a writer version,
// we should not use the cached value. On write, we update the values
// in the manifest, fixing the issue for future reads.
// See: https://github.com/lance-format/lance/issues/1531
if self.dataset.manifest.writer_version.is_some()
Expand All @@ -1227,6 +1220,13 @@ impl FileFragment {
return Ok(physical_rows);
}

if self.metadata.files.is_empty() {
return Err(Error::not_found(format!(
"Fragment {} does not contain any data",
self.id()
)));
};

// Just open any file. All of them should have same size.
let some_file = &self.metadata.files[0];
let reader = self
Expand Down Expand Up @@ -1283,9 +1283,9 @@ impl FileFragment {
}
}

if self.metadata.files.iter().any(|f| f.is_legacy_file())
!= self.metadata.files.iter().all(|f| f.is_legacy_file())
{
let has_legacy_files = self.metadata.files.iter().any(|f| f.is_legacy_file());
let has_non_legacy_files = self.metadata.files.iter().any(|f| !f.is_legacy_file());
if has_legacy_files && has_non_legacy_files {
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(&self.metadata.files[0])?
Expand Down Expand Up @@ -1318,35 +1318,49 @@ impl FileFragment {
let (get_lengths, deletion_vector) = join!(get_lengths, deletion_vector);

let get_lengths = get_lengths?;
let expected_length = get_lengths.first().unwrap_or(&0);
for (length, data_file) in get_lengths.iter().zip(self.metadata.files.iter()) {
if length != expected_length {
let expected_length = if let Some(first_length) = get_lengths.first() {
for (length, data_file) in get_lengths.iter().zip(self.metadata.files.iter()) {
if length != first_length {
let path = self
.dataset
.data_file_dir(data_file)?
.join(data_file.path.as_str());
return Err(Error::corrupt_file(
path,
format!(
"data file has incorrect length. Expected: {} Got: {}",
first_length, length
),
));
}
}
if let Some(physical_rows) = self.metadata.physical_rows
&& physical_rows != *first_length
{
let path = self
.dataset
.data_file_dir(data_file)?
.join(data_file.path.as_str());
.data_file_dir(&self.metadata.files[0])?
.join(self.metadata.files[0].path.as_str());
return Err(Error::corrupt_file(
path,
format!(
"data file has incorrect length. Expected: {} Got: {}",
expected_length, length
"Fragment metadata has incorrect physical_rows. Actual: {} Metadata: {}",
first_length, physical_rows
),
));
}
}
if let Some(physical_rows) = self.metadata.physical_rows
&& physical_rows != *expected_length
{
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(&self.metadata.files[0])?
.join(self.metadata.files[0].path.as_str()),
format!(
"Fragment metadata has incorrect physical_rows. Actual: {} Metadata: {}",
expected_length, physical_rows
),
));
}
*first_length
} else {
self.metadata.physical_rows.ok_or_else(|| {
Error::corrupt_file(
self.dataset.base.clone(),
format!(
"Fragment {} does not contain any data files and is missing physical_rows",
self.id()
),
)
})?
};

if let Some(deletion_vector) = deletion_vector? {
if let Some(num_deletions) = self
Expand All @@ -1372,7 +1386,7 @@ impl FileFragment {
}

for offset in deletion_vector.iter() {
if offset >= *expected_length as u32 {
if offset >= expected_length as u32 {
let deletion_file_meta = self.metadata.deletion_file.as_ref().unwrap();
return Err(Error::corrupt_file(
deletion_file_path(
Expand Down
130 changes: 128 additions & 2 deletions rust/lance/src/dataset/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,10 @@ mod test {
use std::collections::HashMap;
use std::sync::Mutex;

use crate::dataset::WriteParams;
use crate::dataset::{WriteMode, WriteParams};
use arrow_array::{
ArrayRef, Int32Array, ListArray, RecordBatchIterator, StringArray, StructArray,
Array, ArrayRef, Int32Array, Int64Array, ListArray, RecordBatchIterator, StringArray,
StructArray,
};

use super::*;
Expand Down Expand Up @@ -1220,6 +1221,131 @@ mod test {
Ok(())
}

async fn dataset_with_only_all_null_column(test_uri: &str, num_rows: usize) -> Result<Dataset> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
DataType::Int64,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(
(1..=num_rows).map(|value| value as i64),
))],
)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(LanceFileVersion::Stable),
..Default::default()
}),
)
.await?;

dataset
.add_columns(
NewColumnTransform::AllNulls(Arc::new(ArrowSchema::new(vec![ArrowField::new(
"x",
DataType::Int64,
true,
)]))),
None,
None,
)
.await?;
dataset.drop_columns(&["id"]).await?;
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert!(fragments[0].metadata.files.is_empty());
assert_eq!(fragments[0].metadata.physical_rows, Some(num_rows));
dataset.validate().await?;

Ok(dataset)
}

#[tokio::test]
async fn test_scan_all_null_column_after_dropping_only_physical_column() -> Result<()> {
let num_rows = 9;
let test_dir = TempStrDir::default();
let dataset = dataset_with_only_all_null_column(&test_dir, num_rows).await?;

let data = dataset.scan().try_into_batch().await?;

let expected_schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Int64, true)]);
assert_eq!(data.schema().as_ref(), &expected_schema);
assert_eq!(data.num_rows(), num_rows);

let x = data["x"].as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(x.len(), num_rows);
assert_eq!(x.null_count(), num_rows);

Ok(())
}

#[tokio::test]
async fn test_append_readded_column_after_dropping_only_physical_column() -> Result<()> {
let num_existing_rows = 9;
let test_dir = TempStrDir::default();
let mut dataset = dataset_with_only_all_null_column(&test_dir, num_existing_rows).await?;

dataset
.add_columns(
NewColumnTransform::AllNulls(Arc::new(ArrowSchema::new(vec![ArrowField::new(
"id",
DataType::Int64,
true,
)]))),
None,
None,
)
.await?;

let append_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("x", DataType::Int64, true),
ArrowField::new("id", DataType::Int64, true),
]));
let append_batch = RecordBatch::try_new(
append_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![None::<i64>])),
Arc::new(Int64Array::from(vec![Some(42_i64)])),
],
)?;
dataset
.append(
RecordBatchIterator::new(vec![Ok(append_batch)], append_schema),
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await?;

let data = dataset.scan().try_into_batch().await?;

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("x", DataType::Int64, true),
ArrowField::new("id", DataType::Int64, true),
]);
assert_eq!(data.schema().as_ref(), &expected_schema);
assert_eq!(data.num_rows(), num_existing_rows + 1);

let x = data["x"].as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(x.len(), num_existing_rows + 1);
assert_eq!(x.null_count(), num_existing_rows + 1);

let id = data["id"].as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(id.len(), num_existing_rows + 1);
for row_idx in 0..num_existing_rows {
assert!(id.is_null(row_idx));
}
assert_eq!(id.value(num_existing_rows), 42);

Ok(())
}

#[tokio::test]
async fn test_add_column_all_nulls_legacy() -> Result<()> {
let num_rows = 100;
Expand Down
Loading