diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 6adc233d8a7..2a6921530c3 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -11779,6 +11779,40 @@ mod tests { ); } + #[tokio::test] + async fn test_manifest_reload_observes_new_version_from_other_namespace() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace_a = DirectoryNamespaceBuilder::new(temp_path) + .manifest_enabled(true) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + create_scalar_table(&namespace_a, "alpha").await; + + let namespace_b = DirectoryNamespaceBuilder::new(temp_path) + .manifest_enabled(true) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + create_scalar_table(&namespace_b, "beta").await; + + let response = namespace_a + .list_tables(ListTablesRequest { + id: Some(vec![]), + ..Default::default() + }) + .await + .unwrap(); + + let mut tables = response.tables; + tables.sort(); + assert_eq!(tables, vec!["alpha", "beta"]); + } + #[tokio::test] async fn test_migration_not_found_errors_include_table_id() { let temp_dir = TempStdDir::default(); diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 067239b8765..11c9e1c193d 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -670,13 +670,23 @@ pub struct NamespaceInfo { /// A wrapper around a Dataset that provides concurrent access. /// /// This can be cloned cheaply. It supports concurrent reads or exclusive writes. -/// The manifest dataset is always kept strongly consistent by reloading on each read. +/// The manifest dataset uses contiguous attached versions and this module never +/// runs old-version cleanup on it, allowing reads to check only the immediate +/// successor manifest before deciding whether a reload is needed. #[derive(Debug, Clone)] pub struct DatasetConsistencyWrapper(Arc>); impl DatasetConsistencyWrapper { /// Create a new wrapper with the given dataset. pub fn new(dataset: Dataset) -> Self { + debug_assert!( + !dataset + .manifest() + .config + .keys() + .any(|key| key.starts_with("lance.auto_cleanup.")), + "the directory manifest dataset must not enable old-version cleanup" + ); Self(Arc::new(RwLock::new(dataset))) } @@ -728,21 +738,25 @@ impl DatasetConsistencyWrapper { dataset_uri, current_version ); - let latest_version = read_guard.latest_version_id().await.map_err(|e| { + // The directory manifest table uses contiguous attached versions and + // does not run old-version cleanup, so the immediate successor probe is + // enough to detect changes without resolving or loading the latest + // manifest on every namespace read. + let has_successor_version = read_guard.has_successor_version().await.map_err(|e| { lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to get latest version: {:?}", e), + message: format!("Failed to check dataset staleness: {:?}", e), }) })?; log::debug!( - "Reload got latest_version={} for uri={}, current_version={}", - latest_version, + "Reload checked successor_version_exists={} for uri={}, current_version={}", + has_successor_version, dataset_uri, current_version ); drop(read_guard); // If already up-to-date, return early - if latest_version == current_version { + if !has_successor_version { log::debug!("Already up-to-date for uri={}", dataset_uri); return Ok(()); } @@ -751,13 +765,13 @@ impl DatasetConsistencyWrapper { let mut write_guard = self.0.write().await; // Double-check after acquiring write lock (someone else might have reloaded) - let latest_version = write_guard.latest_version_id().await.map_err(|e| { + let has_successor_version = write_guard.has_successor_version().await.map_err(|e| { lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to get latest version: {:?}", e), + message: format!("Failed to check dataset staleness: {:?}", e), }) })?; - if latest_version != write_guard.version().version { + if has_successor_version { write_guard.checkout_latest().await.map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!("Failed to checkout latest: {:?}", e), diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 3784e84a785..e1a4086730b 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -798,6 +798,26 @@ pub trait CommitHandler: Debug + Send + Sync { default_resolve_version(base_path, version, object_store).await } + /// Check whether an attached manifest version exists without loading it. + /// + /// The default implementation probes the deterministic manifest path for + /// the given naming scheme. Commit handlers with an external source of + /// truth should override this method. + async fn version_exists( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + naming_scheme: ManifestNamingScheme, + ) -> Result { + let path = naming_scheme.manifest_path(base_path, version); + match object_store.head(&path).await { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(e) => Err(e.into()), + } + } + /// List detached manifest locations. /// /// Returns a stream of detached manifest locations in arbitrary order. diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 75993ca8d1f..a6c9bbaa90d 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -456,6 +456,31 @@ impl CommitHandler for ExternalManifestCommitHandler { .await } + async fn version_exists( + &self, + base_path: &Path, + version: u64, + object_store: &dyn OSObjectStore, + naming_scheme: ManifestNamingScheme, + ) -> Result { + match self + .external_manifest_store + .get_manifest_location(base_path.as_ref(), version) + .await + { + Ok(_) => Ok(true), + Err(Error::NotFound { .. }) => { + let path = naming_scheme.manifest_path(base_path, version); + match object_store.head(&path).await { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(e) => Err(e.into()), + } + } + Err(e) => Err(e), + } + } + async fn commit( &self, manifest: &mut Manifest, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index f6fd1ef6a20..db9029eb55b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2235,6 +2235,39 @@ impl Dataset { .version) } + /// Return whether the dataset has a newer committed version. + pub async fn is_stale(&self) -> Result { + let latest_version = self.latest_version_id().await?; + Ok(latest_version != self.manifest.version) + } + + /// Return whether the immediate attached successor manifest exists. + /// + /// This is a fast contiguous-history probe. It does not resolve the latest + /// version and may return `false` if intermediate manifests have been + /// removed. Callers that need a general freshness check should use + /// [`Self::is_stale`]. + #[doc(hidden)] + pub async fn has_successor_version(&self) -> Result { + let Some(next_version) = self.manifest.version.checked_add(1) else { + return Ok(false); + }; + if lance_table::format::is_detached_version(next_version) { + return Ok(false); + } + + let exists = self + .commit_handler + .version_exists( + &self.base, + next_version, + self.object_store.inner.as_ref(), + self.manifest_location.naming_scheme, + ) + .await?; + Ok(exists) + } + pub fn count_fragments(&self) -> usize { self.manifest.fragments.len() } diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index a0bc7816a32..c04dd0f3183 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -211,6 +211,77 @@ async fn test_version_id_fast_path() { assert_eq!(historical.latest_version_id().await.unwrap(), 2); } +#[rstest] +#[tokio::test] +async fn test_stale_checks_cover_fast_successor_and_latest_version( + #[values(false, true)] enable_v2_manifest_paths: bool, +) { + let expected_scheme = if enable_v2_manifest_paths { + ManifestNamingScheme::V2 + } else { + ManifestNamingScheme::V1 + }; + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::UInt32, + false, + )])); + + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from_iter_values(0..5))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone()); + + let original = Dataset::write( + reader, + &test_uri, + Some(WriteParams { + enable_v2_manifest_paths, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(original.manifest_location().naming_scheme, expected_scheme); + assert!(!original.is_stale().await.unwrap()); + assert!(!original.has_successor_version().await.unwrap()); + + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt32Array::from_iter_values(5..10))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema); + let updated = Dataset::write( + reader, + &test_uri, + Some(WriteParams { + mode: WriteMode::Append, + enable_v2_manifest_paths, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert!(original.is_stale().await.unwrap()); + assert!(original.has_successor_version().await.unwrap()); + assert_eq!(updated.manifest_location().naming_scheme, expected_scheme); + assert!(!updated.is_stale().await.unwrap()); + assert!(!updated.has_successor_version().await.unwrap()); + + let historical = updated.checkout_version(1).await.unwrap(); + assert_eq!( + historical.manifest_location().naming_scheme, + expected_scheme + ); + assert!(historical.is_stale().await.unwrap()); + assert!(historical.has_successor_version().await.unwrap()); +} + #[rstest] #[tokio::test] async fn test_restore( diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index df2b84a4878..eee4fbf07b6 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -365,6 +365,32 @@ mod test { assert_eq!(ds.version().version, 6); assert_eq!(ds.count_rows(None).await.unwrap(), 60); + { + inner_store.lock().await.remove(&(ds.base.to_string(), 6)); + } + assert!( + handler + .version_exists( + &ds.base, + 6, + ds.object_store.inner.as_ref(), + ds.manifest_location().naming_scheme, + ) + .await + .unwrap() + ); + assert!( + !handler + .version_exists( + &ds.base, + 7, + ds.object_store.inner.as_ref(), + ds.manifest_location().naming_scheme, + ) + .await + .unwrap() + ); + // Open without external store handler again, should see the newly sync'd commit let ds = DatasetBuilder::from_uri(ds_uri).load().await.unwrap(); assert_eq!(ds.version().version, 6);