From 3566d753c4bbebbd5ed9ae7d8937370370bf40d5 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Thu, 11 Jun 2026 16:59:48 -0700 Subject: [PATCH] feat(dir-catalog): add reader/writer feature flags to __manifest Add forward-compatibility infrastructure to the directory-catalog __manifest dataset, mirroring the Lance table format's reader/writer feature flags but at the catalog-manifest layer. Two u64 bitmasks are persisted in the __manifest dataset's table_metadata; a build refuses to read or write a manifest that sets a flag it does not understand (clean "please upgrade" error) instead of misreading it. Absent flags parse as 0, so every existing manifest stays compatible. This is the mechanism only: no manifest feature is defined yet, so the known masks are 0 and nothing is ever set, i.e. zero behavior change. The first format change that needs forward-compatibility protection adds its bit and stamps it on write. Also stop the directory catalog from silently degrading to directory listing when the manifest is incompatible: build() and the per-op fallbacks propagate the incompatibility instead of masking it, so the check cannot be bypassed. --- rust/lance-namespace-impls/src/dir.rs | 17 ++ .../lance-namespace-impls/src/dir/manifest.rs | 185 +++++++++++++++-- .../src/dir/manifest_feature_flags.rs | 194 ++++++++++++++++++ 3 files changed, 383 insertions(+), 13 deletions(-) create mode 100644 rust/lance-namespace-impls/src/dir/manifest_feature_flags.rs diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 6adc233d8a7..c2b4901ddcf 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -7,6 +7,7 @@ //! that stores tables as Lance datasets in a filesystem directory structure. pub mod manifest; +pub mod manifest_feature_flags; use arrow::array::Float32Array; use arrow::record_batch::RecordBatchIterator; @@ -719,6 +720,12 @@ impl DirectoryNamespaceBuilder { .await { Ok(ns) => Some(Arc::new(ns)), + Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => { + // The manifest exists but was written with a feature flag this + // build does not understand. Refuse rather than silently + // degrading to a directory-listing view that ignores it. + return Err(e); + } Err(e) => { // Failed to initialize manifest namespace, fall back to directory listing only log::warn!( @@ -1412,6 +1419,11 @@ impl DirectoryNamespace { } return Ok(response); } + Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => { + // An incompatible manifest must surface "please upgrade" + // rather than degrading to a directory-listing view. + return Err(e); + } Err(_) if self.dir_listing_enabled && is_root_level => { // Fall through to directory check only for single-level IDs } @@ -2650,6 +2662,11 @@ impl LanceNamespace for DirectoryNamespace { { match manifest_ns.table_exists(request.clone()).await { Ok(()) => return Ok(()), + Err(e) if manifest_feature_flags::is_incompatible_manifest_error(&e) => { + // An incompatible manifest must surface "please upgrade" + // rather than degrading to a directory-listing view. + return Err(e); + } Err(_) if self.dir_listing_enabled && is_root_level => { // Fall through to directory check only for single-level IDs } diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 067239b8765..5005548df97 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -6,6 +6,7 @@ //! This module provides a namespace implementation that uses a manifest table //! to track tables and nested namespaces. +use super::manifest_feature_flags::{ensure_readable, ensure_writable}; use arrow::array::builder::{ListBuilder, StringBuilder}; use arrow::array::{Array, ListArray, RecordBatch, RecordBatchIterator, StringArray, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; @@ -684,26 +685,35 @@ impl DatasetConsistencyWrapper { /// Always reloads to ensure strong consistency. pub async fn get(&self) -> Result> { self.reload().await?; - Ok(DatasetReadGuard { + let guard = DatasetReadGuard { guard: self.0.read().await, - }) + }; + // Refuse manifests written with a reader feature flag this build does + // not understand instead of misreading them. + ensure_readable(guard.metadata())?; + Ok(guard) } /// Reload the dataset and return a reference. pub async fn get_refreshed(&self) -> Result> { self.reload().await?; - Ok(DatasetReadGuard { + let guard = DatasetReadGuard { guard: self.0.read().await, - }) + }; + ensure_readable(guard.metadata())?; + Ok(guard) } /// Get a mutable reference to the dataset. /// Always reloads to ensure strong consistency. pub async fn get_mut(&self) -> Result> { self.reload().await?; - Ok(DatasetWriteGuard { + let guard = DatasetWriteGuard { guard: self.0.write().await, - }) + }; + ensure_readable(guard.metadata())?; + ensure_writable(guard.metadata())?; + Ok(guard) } /// Provide a known latest version of the dataset. @@ -1977,6 +1987,15 @@ impl ManifestNamespace { } } + /// Validate that this build can write the current `__manifest` before a + /// mutating operation performs any side effect (e.g. writing table data), so + /// a refused write leaves nothing orphaned behind. The eventual commit + /// re-checks, so a concurrent upgrade in between is still caught. + async fn ensure_manifest_writable(&self) -> Result<()> { + let dataset_guard = self.manifest_dataset.get().await?; + ensure_writable(dataset_guard.metadata()) + } + async fn rewrite_manifest( &self, operation: &str, @@ -1996,6 +2015,9 @@ impl ManifestNamespace { let dataset_guard = self.manifest_dataset.get_refreshed().await?; let dataset = Arc::new(dataset_guard.clone()); drop(dataset_guard); + // Refuse to mutate a manifest written with a writer feature flag this + // build does not understand. + ensure_writable(dataset.metadata())?; // Staged files, indices, the commit, and cleanup must all use the dataset's // own object store (see `commit_manifest_overwrite`). let object_store = dataset.object_store(None).await?; @@ -2854,6 +2876,10 @@ impl ManifestNamespace { .load() .await; if let Ok(mut dataset) = dataset_result { + // Reject a manifest written with a reader feature flag this build + // does not understand before touching it. + ensure_readable(dataset.metadata())?; + // Check if the object_id field has primary key metadata, migrate if not let needs_pk_migration = dataset .schema() @@ -2865,6 +2891,9 @@ impl ManifestNamespace { .unwrap_or(false); if needs_pk_migration { + // This legacy migration writes to the manifest, so confirm this + // build is allowed to write the current format first. + ensure_writable(dataset.metadata())?; log::info!("Migrating __manifest table to add primary key metadata on object_id"); dataset .update_field_metadata() @@ -2894,15 +2923,24 @@ impl ManifestNamespace { .map(|v| v != "true") .unwrap_or(true); - if needs_flag - && let Err(e) = dataset + // Persisting this flag commits a new manifest version, so it must + // respect the writer feature-flag check. Degrade gracefully on an + // unwritable manifest (don't fail the open) so reads still work. + if needs_flag { + if let Err(e) = ensure_writable(dataset.metadata()) { + log::warn!( + "Not persisting table_version_storage_enabled on an unwritable __manifest: {:?}", + e + ); + } else if let Err(e) = dataset .update_metadata([("table_version_storage_enabled", "true")]) .await - { - log::warn!( - "Failed to persist table_version_storage_enabled flag in __manifest: {:?}", - e - ); + { + log::warn!( + "Failed to persist table_version_storage_enabled flag in __manifest: {:?}", + e + ); + } } } @@ -3295,6 +3333,10 @@ impl LanceNamespace for ManifestNamespace { let (namespace, table_name) = Self::split_object_id(table_id); let object_id = Self::build_object_id(&namespace, &table_name); + // Refuse before writing any table data if this build cannot write the + // manifest, so a refused create leaves no orphaned dataset behind. + self.ensure_manifest_writable().await?; + let existing_table = self.query_manifest_for_table(&object_id).await?; let existing_has_manifests = if let Some(existing_table) = &existing_table { Some( @@ -4256,6 +4298,123 @@ mod tests { buffer } + /// Open the `__manifest` dataset directly and set a table-metadata key, + /// simulating a future Lance client that persisted a feature flag. + async fn set_manifest_table_metadata(temp_path: &str, key: &str, value: &str) { + use lance::dataset::builder::DatasetBuilder; + let mut ds = DatasetBuilder::from_uri(format!("{}/{}", temp_path, MANIFEST_TABLE_NAME)) + .load() + .await + .unwrap(); + ds.update_metadata([(key, value)]).await.unwrap(); + } + + async fn create_namespace_with_one_table(temp_path: &str) { + let ns = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["t1".to_string()]); + ns.create_table(create_request, Bytes::from(create_test_ipc_data())) + .await + .unwrap(); + } + + /// This is a forward-compatibility checker only: it must not set any feature + /// flag, so existing clients keep treating the manifest as compatible. + #[tokio::test] + async fn test_manifest_has_no_feature_flags_by_default() { + use lance::dataset::builder::DatasetBuilder; + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + create_namespace_with_one_table(temp_path).await; + + let ds = DatasetBuilder::from_uri(format!("{}/{}", temp_path, MANIFEST_TABLE_NAME)) + .load() + .await + .unwrap(); + assert!( + !ds.metadata() + .contains_key(crate::dir::manifest_feature_flags::READER_FEATURE_FLAGS_KEY) + ); + assert!( + !ds.metadata() + .contains_key(crate::dir::manifest_feature_flags::WRITER_FEATURE_FLAGS_KEY) + ); + } + + /// An unknown reader feature flag must block opening the catalog with a clear + /// "please upgrade" error rather than silently degrading to directory listing. + #[tokio::test] + async fn test_unknown_reader_flag_blocks_access() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + create_namespace_with_one_table(temp_path).await; + set_manifest_table_metadata( + temp_path, + crate::dir::manifest_feature_flags::READER_FEATURE_FLAGS_KEY, + "1", + ) + .await; + + let err = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .expect_err("opening a manifest with an unknown reader flag should fail"); + assert!( + err.to_string().to_lowercase().contains("upgrade"), + "expected an upgrade error, got: {err}" + ); + } + + /// An unknown writer feature flag must still allow reads but block writes. + #[tokio::test] + async fn test_unknown_writer_flag_blocks_writes_but_allows_reads() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + create_namespace_with_one_table(temp_path).await; + set_manifest_table_metadata( + temp_path, + crate::dir::manifest_feature_flags::WRITER_FEATURE_FLAGS_KEY, + "1", + ) + .await; + + let ns = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .expect("reads should still be allowed with only a writer flag set"); + let mut list_request = ListTablesRequest::new(); + list_request.id = Some(vec![]); + assert_eq!(ns.list_tables(list_request).await.unwrap().tables.len(), 1); + + // A refused write must not leave an orphaned table dataset behind. + let entries_before = dir_entry_names(temp_path); + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["t2".to_string()]); + let err = ns + .create_table(create_request, Bytes::from(create_test_ipc_data())) + .await + .expect_err("writing through an unknown writer flag should fail"); + assert!( + err.to_string().to_lowercase().contains("upgrade"), + "expected an upgrade error, got: {err}" + ); + assert_eq!( + entries_before, + dir_entry_names(temp_path), + "a refused create_table must not create an orphaned table directory" + ); + } + + fn dir_entry_names(path: &str) -> std::collections::BTreeSet { + std::fs::read_dir(path) + .unwrap() + .map(|e| e.unwrap().file_name().to_string_lossy().into_owned()) + .collect() + } + #[tokio::test] async fn test_manifest_rewrite_preserves_utf8_metadata_and_base_objects() { let temp_dir = TempStdDir::default(); diff --git a/rust/lance-namespace-impls/src/dir/manifest_feature_flags.rs b/rust/lance-namespace-impls/src/dir/manifest_feature_flags.rs new file mode 100644 index 00000000000..d0849ceda4f --- /dev/null +++ b/rust/lance-namespace-impls/src/dir/manifest_feature_flags.rs @@ -0,0 +1,194 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Reader/writer feature flags for the directory-catalog `__manifest` dataset. +//! +//! Forward-compatibility infrastructure for the `__manifest` Lance dataset, +//! analogous to the Lance table format's `reader_feature_flags` / +//! `writer_feature_flags` but describing the *catalog manifest* format (schema +//! and semantics) rather than the underlying Lance file format. The flags are +//! persisted in the `__manifest` dataset's `table_metadata` map. +//! +//! Each manifest feature owns one bit in a `u64` bitmask. A build may read a +//! `__manifest` only if it understands every set reader-flag bit, and may write +//! it only if it understands every set writer-flag bit; otherwise it fails fast +//! with a clear "please upgrade" error instead of silently misreading data. The +//! set of bits a build understands is `READER_KNOWN_FLAGS` / `WRITER_KNOWN_FLAGS`. +//! +//! This is the mechanism only: no manifest feature is defined yet, so the known +//! masks are `0` and nothing is ever set — every current manifest reads and +//! writes unchanged. The first format change that needs forward-compatibility +//! protection adds its bit to the known masks and stamps it on write; from then +//! on, builds without that bit refuse the new format rather than misreading it. +//! Manifests written before this mechanism carry no flag keys, which parse as +//! `0` and stay compatible with every build. + +use std::collections::HashMap; + +use lance_core::{Error, Result}; +use lance_namespace::error::NamespaceError; + +/// `table_metadata` key holding the reader feature-flag bitmask (decimal `u64`). +pub const READER_FEATURE_FLAGS_KEY: &str = "lance.namespace.manifest.reader_feature_flags"; +/// `table_metadata` key holding the writer feature-flag bitmask (decimal `u64`). +pub const WRITER_FEATURE_FLAGS_KEY: &str = "lance.namespace.manifest.writer_feature_flags"; + +/// Reader feature-flag bits this build understands. No manifest feature is +/// defined yet, so this build understands none and refuses any non-zero reader +/// flag. A future format change adds its bit here. +const READER_KNOWN_FLAGS: u64 = 0; +/// Writer feature-flag bits this build understands. +const WRITER_KNOWN_FLAGS: u64 = 0; + +/// Whether this build can read a `__manifest` whose persisted reader feature +/// flags are `reader_flags` — i.e. it understands every set bit. +pub fn can_read_manifest(reader_flags: u64) -> bool { + (reader_flags & !READER_KNOWN_FLAGS) == 0 +} + +/// Whether this build can write a `__manifest` whose persisted writer feature +/// flags are `writer_flags` — i.e. it understands every set bit. +pub fn can_write_manifest(writer_flags: u64) -> bool { + (writer_flags & !WRITER_KNOWN_FLAGS) == 0 +} + +fn parse_flags(table_metadata: &HashMap, key: &str) -> Result { + match table_metadata.get(key) { + None => Ok(0), + Some(raw) => raw.parse::().map_err(|e| { + Error::from(NamespaceError::Unsupported { + message: format!( + "The __manifest dataset has an unparsable feature-flag value '{raw}' for \ + '{key}': {e}. This likely means it was written by a newer, incompatible \ + version of Lance; please upgrade Lance to use this catalog." + ), + }) + }), + } +} + +/// Reader feature flags persisted in the `__manifest` `table_metadata` (`0` if absent). +pub fn reader_flags(table_metadata: &HashMap) -> Result { + parse_flags(table_metadata, READER_FEATURE_FLAGS_KEY) +} + +/// Writer feature flags persisted in the `__manifest` `table_metadata` (`0` if absent). +pub fn writer_flags(table_metadata: &HashMap) -> Result { + parse_flags(table_metadata, WRITER_FEATURE_FLAGS_KEY) +} + +/// Validate that this build can READ the `__manifest` described by `table_metadata`, +/// returning a clear "please upgrade" error otherwise. +pub fn ensure_readable(table_metadata: &HashMap) -> Result<()> { + let flags = reader_flags(table_metadata)?; + if !can_read_manifest(flags) { + return Err(Error::from(NamespaceError::Unsupported { + message: format!( + "The __manifest dataset was written with reader feature flags {flags}, which this \ + version of Lance does not understand (known reader flags: {READER_KNOWN_FLAGS}). \ + Please upgrade Lance to read this catalog." + ), + })); + } + Ok(()) +} + +/// Validate that this build can WRITE the `__manifest` described by `table_metadata`, +/// returning a clear "please upgrade" error otherwise. +pub fn ensure_writable(table_metadata: &HashMap) -> Result<()> { + let flags = writer_flags(table_metadata)?; + if !can_write_manifest(flags) { + return Err(Error::from(NamespaceError::Unsupported { + message: format!( + "The __manifest dataset was written with writer feature flags {flags}, which this \ + version of Lance does not understand (known writer flags: {WRITER_KNOWN_FLAGS}). \ + Please upgrade Lance to modify this catalog." + ), + })); + } + Ok(()) +} + +/// Whether `err` indicates the `__manifest` is in a format this build cannot +/// handle — i.e. it carries an unknown reader/writer feature flag, surfaced by +/// [`ensure_readable`] / [`ensure_writable`] as a [`NamespaceError::Unsupported`]. +/// +/// Catalog initialization uses this to refuse opening such a manifest rather +/// than silently degrading to a directory-listing view that ignores it. The +/// `__manifest` open path raises no other `Unsupported` error, so matching the +/// code is sufficient and avoids brittle message matching. +pub fn is_incompatible_manifest_error(err: &Error) -> bool { + matches!( + err, + Error::Namespace { source, .. } + if source + .downcast_ref::() + .is_some_and(|e| matches!(e, NamespaceError::Unsupported { .. })) + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn meta(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn unflagged_is_compatible() { + assert!(can_read_manifest(0)); + assert!(can_write_manifest(0)); + let empty = HashMap::new(); + assert!(ensure_readable(&empty).is_ok()); + assert!(ensure_writable(&empty).is_ok()); + assert_eq!(reader_flags(&empty).unwrap(), 0); + assert_eq!(writer_flags(&empty).unwrap(), 0); + // Explicit zeroes are also compatible. + let zeroed = meta(&[ + (READER_FEATURE_FLAGS_KEY, "0"), + (WRITER_FEATURE_FLAGS_KEY, "0"), + ]); + assert!(ensure_readable(&zeroed).is_ok()); + assert!(ensure_writable(&zeroed).is_ok()); + } + + #[test] + fn any_unknown_flag_is_refused() { + // This build understands no feature flags, so any non-zero bit is refused. + assert!(!can_read_manifest(1)); + assert!(!can_write_manifest(1)); + assert!(!can_read_manifest(1 << 30)); + assert!(!can_write_manifest(1 << 63)); + + let reader = meta(&[(READER_FEATURE_FLAGS_KEY, "1")]); + let err = ensure_readable(&reader).unwrap_err(); + assert!(err.to_string().to_lowercase().contains("upgrade")); + assert!(is_incompatible_manifest_error(&err)); + // A reader flag does not block writers that the writer mask allows. + assert!(ensure_writable(&reader).is_ok()); + + let writer = meta(&[(WRITER_FEATURE_FLAGS_KEY, "2")]); + let err = ensure_writable(&writer).unwrap_err(); + assert!(err.to_string().to_lowercase().contains("upgrade")); + assert!(is_incompatible_manifest_error(&err)); + } + + #[test] + fn unparsable_value_is_refused() { + let m = meta(&[(READER_FEATURE_FLAGS_KEY, "not-a-number")]); + assert!(reader_flags(&m).is_err()); + assert!(ensure_readable(&m).is_err()); + } + + #[test] + fn unrelated_error_is_not_an_incompatibility() { + let other = Error::from(NamespaceError::TableNotFound { + message: "x".to_string(), + }); + assert!(!is_incompatible_manifest_error(&other)); + } +}