Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! that stores tables as Lance datasets in a filesystem directory structure.

pub mod manifest;
pub mod manifest_feature_flags;

use arrow::array::Float32Array;
use arrow::record_batch::RecordBatchIterator;
Expand Down Expand Up @@ -719,6 +720,12 @@ impl DirectoryNamespaceBuilder {
.await
{
Ok(ns) => Some(Arc::new(ns)),
Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => {
// The manifest exists but was written with a feature flag this
// build does not understand. Refuse rather than silently
// degrading to a directory-listing view that ignores it.
return Err(e);
}
Err(e) => {
// Failed to initialize manifest namespace, fall back to directory listing only
log::warn!(
Expand Down Expand Up @@ -1412,6 +1419,11 @@ impl DirectoryNamespace {
}
return Ok(response);
}
Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => {
// An incompatible manifest must surface "please upgrade"
// rather than degrading to a directory-listing view.
return Err(e);
}
Err(_) if self.dir_listing_enabled && is_root_level => {
// Fall through to directory check only for single-level IDs
}
Expand Down Expand Up @@ -2650,6 +2662,11 @@ impl LanceNamespace for DirectoryNamespace {
{
match manifest_ns.table_exists(request.clone()).await {
Ok(()) => return Ok(()),
Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => {
// An incompatible manifest must surface "please upgrade"
// rather than degrading to a directory-listing view.
return Err(e);
}
Err(_) if self.dir_listing_enabled && is_root_level => {
// Fall through to directory check only for single-level IDs
}
Expand Down
185 changes: 172 additions & 13 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! This module provides a namespace implementation that uses a manifest table
//! to track tables and nested namespaces.

use super::manifest_feature_flags::{ensure_readable, ensure_writable};
use arrow::array::builder::{ListBuilder, StringBuilder};
use arrow::array::{Array, ListArray, RecordBatch, RecordBatchIterator, StringArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef};
Expand Down Expand Up @@ -684,26 +685,35 @@ impl DatasetConsistencyWrapper {
/// Always reloads to ensure strong consistency.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.reload().await?;
Ok(DatasetReadGuard {
let guard = DatasetReadGuard {
guard: self.0.read().await,
})
};
// Refuse manifests written with a reader feature flag this build does
// not understand instead of misreading them.
ensure_readable(guard.metadata())?;
Ok(guard)
}

/// Reload the dataset and return a reference.
pub async fn get_refreshed(&self) -> Result<DatasetReadGuard<'_>> {
self.reload().await?;
Ok(DatasetReadGuard {
let guard = DatasetReadGuard {
guard: self.0.read().await,
})
};
ensure_readable(guard.metadata())?;
Ok(guard)
}

/// Get a mutable reference to the dataset.
/// Always reloads to ensure strong consistency.
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.reload().await?;
Ok(DatasetWriteGuard {
let guard = DatasetWriteGuard {
guard: self.0.write().await,
})
};
ensure_readable(guard.metadata())?;
ensure_writable(guard.metadata())?;
Ok(guard)
}

/// Provide a known latest version of the dataset.
Expand Down Expand Up @@ -1977,6 +1987,15 @@ impl ManifestNamespace {
}
}

/// Validate that this build can write the current `__manifest` before a
/// mutating operation performs any side effect (e.g. writing table data), so
/// a refused write leaves nothing orphaned behind. The eventual commit
/// re-checks, so a concurrent upgrade in between is still caught.
async fn ensure_manifest_writable(&self) -> Result<()> {
let dataset_guard = self.manifest_dataset.get().await?;
ensure_writable(dataset_guard.metadata())
}

async fn rewrite_manifest<M, F>(
&self,
operation: &str,
Expand All @@ -1996,6 +2015,9 @@ impl ManifestNamespace {
let dataset_guard = self.manifest_dataset.get_refreshed().await?;
let dataset = Arc::new(dataset_guard.clone());
drop(dataset_guard);
// Refuse to mutate a manifest written with a writer feature flag this
// build does not understand.
ensure_writable(dataset.metadata())?;
// Staged files, indices, the commit, and cleanup must all use the dataset's
// own object store (see `commit_manifest_overwrite`).
let object_store = dataset.object_store(None).await?;
Expand Down Expand Up @@ -2854,6 +2876,10 @@ impl ManifestNamespace {
.load()
.await;
if let Ok(mut dataset) = dataset_result {
// Reject a manifest written with a reader feature flag this build
// does not understand before touching it.
ensure_readable(dataset.metadata())?;

// Check if the object_id field has primary key metadata, migrate if not
let needs_pk_migration = dataset
.schema()
Expand All @@ -2865,6 +2891,9 @@ impl ManifestNamespace {
.unwrap_or(false);

if needs_pk_migration {
// This legacy migration writes to the manifest, so confirm this
// build is allowed to write the current format first.
ensure_writable(dataset.metadata())?;
log::info!("Migrating __manifest table to add primary key metadata on object_id");
dataset
.update_field_metadata()
Expand Down Expand Up @@ -2894,15 +2923,24 @@ impl ManifestNamespace {
.map(|v| v != "true")
.unwrap_or(true);

if needs_flag
&& let Err(e) = dataset
// Persisting this flag commits a new manifest version, so it must
// respect the writer feature-flag check. Degrade gracefully on an
// unwritable manifest (don't fail the open) so reads still work.
if needs_flag {
if let Err(e) = ensure_writable(dataset.metadata()) {
log::warn!(
"Not persisting table_version_storage_enabled on an unwritable __manifest: {:?}",
e
);
} else if let Err(e) = dataset
.update_metadata([("table_version_storage_enabled", "true")])
.await
{
log::warn!(
"Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
e
);
{
log::warn!(
"Failed to persist table_version_storage_enabled flag in __manifest: {:?}",
e
);
}
}
}

Expand Down Expand Up @@ -3295,6 +3333,10 @@ impl LanceNamespace for ManifestNamespace {
let (namespace, table_name) = Self::split_object_id(table_id);
let object_id = Self::build_object_id(&namespace, &table_name);

// Refuse before writing any table data if this build cannot write the
// manifest, so a refused create leaves no orphaned dataset behind.
self.ensure_manifest_writable().await?;

let existing_table = self.query_manifest_for_table(&object_id).await?;
let existing_has_manifests = if let Some(existing_table) = &existing_table {
Some(
Expand Down Expand Up @@ -4256,6 +4298,123 @@ mod tests {
buffer
}

/// Open the `__manifest` dataset directly and set a table-metadata key,
/// simulating a future Lance client that persisted a feature flag.
async fn set_manifest_table_metadata(temp_path: &str, key: &str, value: &str) {
use lance::dataset::builder::DatasetBuilder;
let mut ds = DatasetBuilder::from_uri(format!("{}/{}", temp_path, MANIFEST_TABLE_NAME))
.load()
.await
.unwrap();
ds.update_metadata([(key, value)]).await.unwrap();
}

async fn create_namespace_with_one_table(temp_path: &str) {
let ns = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.unwrap();
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["t1".to_string()]);
ns.create_table(create_request, Bytes::from(create_test_ipc_data()))
.await
.unwrap();
}

/// This is a forward-compatibility checker only: it must not set any feature
/// flag, so existing clients keep treating the manifest as compatible.
#[tokio::test]
async fn test_manifest_has_no_feature_flags_by_default() {
use lance::dataset::builder::DatasetBuilder;
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
create_namespace_with_one_table(temp_path).await;

let ds = DatasetBuilder::from_uri(format!("{}/{}", temp_path, MANIFEST_TABLE_NAME))
.load()
.await
.unwrap();
assert!(
!ds.metadata()
.contains_key(crate::dir::manifest_feature_flags::READER_FEATURE_FLAGS_KEY)
);
assert!(
!ds.metadata()
.contains_key(crate::dir::manifest_feature_flags::WRITER_FEATURE_FLAGS_KEY)
);
}

/// An unknown reader feature flag must block opening the catalog with a clear
/// "please upgrade" error rather than silently degrading to directory listing.
#[tokio::test]
async fn test_unknown_reader_flag_blocks_access() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
create_namespace_with_one_table(temp_path).await;
set_manifest_table_metadata(
temp_path,
crate::dir::manifest_feature_flags::READER_FEATURE_FLAGS_KEY,
"1",
)
.await;

let err = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.expect_err("opening a manifest with an unknown reader flag should fail");
assert!(
err.to_string().to_lowercase().contains("upgrade"),
"expected an upgrade error, got: {err}"
);
}

/// An unknown writer feature flag must still allow reads but block writes.
#[tokio::test]
async fn test_unknown_writer_flag_blocks_writes_but_allows_reads() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();
create_namespace_with_one_table(temp_path).await;
set_manifest_table_metadata(
temp_path,
crate::dir::manifest_feature_flags::WRITER_FEATURE_FLAGS_KEY,
"1",
)
.await;

let ns = DirectoryNamespaceBuilder::new(temp_path)
.build()
.await
.expect("reads should still be allowed with only a writer flag set");
let mut list_request = ListTablesRequest::new();
list_request.id = Some(vec![]);
assert_eq!(ns.list_tables(list_request).await.unwrap().tables.len(), 1);

// A refused write must not leave an orphaned table dataset behind.
let entries_before = dir_entry_names(temp_path);
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["t2".to_string()]);
let err = ns
.create_table(create_request, Bytes::from(create_test_ipc_data()))
.await
.expect_err("writing through an unknown writer flag should fail");
assert!(
err.to_string().to_lowercase().contains("upgrade"),
"expected an upgrade error, got: {err}"
);
assert_eq!(
entries_before,
dir_entry_names(temp_path),
"a refused create_table must not create an orphaned table directory"
);
}

fn dir_entry_names(path: &str) -> std::collections::BTreeSet<String> {
std::fs::read_dir(path)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().into_owned())
.collect()
}

#[tokio::test]
async fn test_manifest_rewrite_preserves_utf8_metadata_and_base_objects() {
let temp_dir = TempStdDir::default();
Expand Down
Loading
Loading