diff --git a/Cargo.lock b/Cargo.lock index 32baccd1ce0..2050bd0698e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2719,6 +2719,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ + "powerfmt", "serde_core", ] @@ -5053,6 +5054,7 @@ dependencies = [ "serde_json", "sha2 0.10.9", "tempfile", + "time", "tokio", "tower", "tower-http 0.5.2", @@ -8840,11 +8842,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.48" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1aa89044e7786ffb2ec017acb22cb7de5b0be46d0f21aea2b224b8561e5db2" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde_core", @@ -8854,15 +8857,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1c906769ad99c88eaa54e728060edef082f8e358ff32030cb7c7d315e81109" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.28" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d3bfe86347f0cc659f586f01e26303ccd32418f26f30c7b0309b3ca3a07d695" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index df24deeca17..5733f730fc0 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -2276,6 +2276,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ + "powerfmt", "serde_core", ] @@ -4244,6 +4245,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "time", "tokio", "tower", "tower-http 0.5.2", @@ -7146,11 +7148,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.48" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1aa89044e7786ffb2ec017acb22cb7de5b0be46d0f21aea2b224b8561e5db2" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde_core", @@ -7160,15 +7163,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1c906769ad99c88eaa54e728060edef082f8e358ff32030cb7c7d315e81109" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.28" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d3bfe86347f0cc659f586f01e26303ccd32418f26f30c7b0309b3ca3a07d695" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/python/Cargo.lock b/python/Cargo.lock index eb9a1abe6bb..f2f9990e7cd 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -2586,6 +2586,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ + "powerfmt", "serde_core", ] @@ -4576,6 +4577,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "time", "tokio", "tower", "tower-http 0.5.2", @@ -7939,11 +7941,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.48" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1aa89044e7786ffb2ec017acb22cb7de5b0be46d0f21aea2b224b8561e5db2" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde_core", @@ -7953,15 +7956,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.9" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1c906769ad99c88eaa54e728060edef082f8e358ff32030cb7c7d315e81109" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.28" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d3bfe86347f0cc659f586f01e26303ccd32418f26f30c7b0309b3ca3a07d695" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index c2bf057ee21..27b9a4bc0e2 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -79,6 +79,11 @@ base64 = { version = "0.22", optional = true } aws-sdk-sts = { version = "1.38.0", optional = true, default-features = false, features = ["default-https-client", "rt-tokio"] } aws-config = { workspace = true, optional = true } +# Pin: time 0.3.48 conflicts with aws-smithy-types (E0119: conflicting `From` impls), which this +# crate pulls in via the AWS credential vendor. Capping time here forces the workspace resolver to +# 0.3.47 even for no-lock builds. Not used directly; remove once the upstream conflict is resolved. +time = "=0.3.47" + # GCP credential vending dependencies (optional, enabled by "credential-vendor-gcp" feature) ring = { version = "0.17", optional = true } rustls-pki-types = { version = "1", optional = true } diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 6adc233d8a7..09106314971 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -195,9 +195,6 @@ pub struct DirectoryNamespaceBuilder { dir_listing_enabled: bool, inline_optimization_enabled: bool, table_version_tracking_enabled: bool, - /// When true, table versions are stored in the `__manifest` table instead of - /// relying on Lance's native version management. - table_version_storage_enabled: bool, /// When true, enables migration mode where the namespace checks the manifest first /// before falling back to directory listing for root-level tables. When false (default), /// root-level tables use directory listing directly without checking the manifest, @@ -233,10 +230,6 @@ impl std::fmt::Debug for DirectoryNamespaceBuilder { "table_version_tracking_enabled", &self.table_version_tracking_enabled, ) - .field( - "table_version_storage_enabled", - &self.table_version_storage_enabled, - ) .field( "dir_listing_to_manifest_migration_enabled", &self.dir_listing_to_manifest_migration_enabled, @@ -273,7 +266,6 @@ impl DirectoryNamespaceBuilder { dir_listing_enabled: true, // Default to enabled for backwards compatibility inline_optimization_enabled: true, table_version_tracking_enabled: false, // Default to disabled - table_version_storage_enabled: false, // Default to disabled dir_listing_to_manifest_migration_enabled: false, // Default to disabled credential_vendor_properties: HashMap::new(), context_provider: None, @@ -334,19 +326,6 @@ impl DirectoryNamespaceBuilder { self } - /// Enable or disable table version management through the `__manifest` table. - /// - /// When enabled, table versions are tracked as `table_version` entries in the - /// `__manifest` Lance table. This enables: - /// - Centralized version tracking instead of per-table `_versions/` directories - /// - /// Requires `manifest_enabled` to be true. - /// When disabled (default), version storage uses per-table storage operations. - pub fn table_version_storage_enabled(mut self, enabled: bool) -> Self { - self.table_version_storage_enabled = enabled; - self - } - /// Create a DirectoryNamespaceBuilder from properties HashMap. /// /// This method parses a properties map into builder configuration. @@ -464,12 +443,6 @@ impl DirectoryNamespaceBuilder { .and_then(|v| v.parse::().ok()) .unwrap_or(false); - // Extract table_version_storage_enabled (default: false) - let table_version_storage_enabled = properties - .get("table_version_storage_enabled") - .and_then(|v| v.parse::().ok()) - .unwrap_or(false); - // Extract dir_listing_to_manifest_migration_enabled (default: false) let dir_listing_to_manifest_migration_enabled = properties .get("dir_listing_to_manifest_migration_enabled") @@ -516,7 +489,6 @@ impl DirectoryNamespaceBuilder { dir_listing_enabled, inline_optimization_enabled, table_version_tracking_enabled, - table_version_storage_enabled, dir_listing_to_manifest_migration_enabled, credential_vendor_properties, context_provider: None, @@ -693,14 +665,6 @@ impl DirectoryNamespaceBuilder { /// - Connection to the storage backend fails /// - Storage options are invalid pub async fn build(self) -> Result { - // Validate: table_version_storage_enabled requires manifest_enabled - if self.table_version_storage_enabled && !self.manifest_enabled { - return Err(NamespaceError::InvalidInput { - message: "table_version_storage_enabled requires manifest_enabled=true".to_string(), - } - .into()); - } - let (object_store, base_path) = Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?; @@ -714,7 +678,6 @@ impl DirectoryNamespaceBuilder { self.dir_listing_enabled, self.inline_optimization_enabled, self.commit_retries, - self.table_version_storage_enabled, ) .await { @@ -759,7 +722,6 @@ impl DirectoryNamespaceBuilder { dir_listing_to_manifest_migration_enabled: self .dir_listing_to_manifest_migration_enabled, table_version_tracking_enabled: self.table_version_tracking_enabled, - table_version_storage_enabled: self.table_version_storage_enabled, credential_vendor, context_provider: self.context_provider, vend_input_storage_options: self.vend_input_storage_options, @@ -842,8 +804,6 @@ pub struct DirectoryNamespace { /// When true, `describe_table` returns `managed_versioning: true` to indicate /// commits should go through namespace table version APIs. table_version_tracking_enabled: bool, - /// When true, table versions are stored in the `__manifest` table. - table_version_storage_enabled: bool, /// Credential vendor created once during initialization. /// Used to vend temporary credentials for table access. credential_vendor: Option>, @@ -2211,18 +2171,16 @@ impl DirectoryNamespace { Ok(migrated_count) } - /// Delete physical manifest files for the given table version ranges (best-effort). + /// Delete physical manifest files for the given table version ranges. /// - /// This helper is used by `batch_delete_table_versions` in both the manifest-enabled - /// and non-manifest paths. It resolves each table's storage location, computes the - /// version file paths, and attempts to delete them. Errors are logged (best-effort) - /// when `best_effort` is true, or returned immediately when false. + /// This helper backs `batch_delete_table_versions`. It resolves each table's storage + /// location, computes the version file paths, and deletes them, returning an error on + /// the first failure. /// /// Returns the number of files successfully deleted. async fn delete_physical_version_files( &self, table_entries: &[TableDeleteEntry], - best_effort: bool, branch: Option<&str>, ) -> Result { let mut deleted_count = 0i64; @@ -2268,22 +2226,13 @@ impl DirectoryNamespace { } Err(object_store::Error::NotFound { .. }) => {} Err(e) => { - if best_effort { - log::warn!( - "Failed to delete manifest file for version {} of table {:?}: {:?}", - v, - te.table_id, - e - ); - } else { - return Err(NamespaceError::Internal { - message: format!( - "Failed to delete version {} for table at '{}': {}", - v, table_uri, e - ), - } - .into()); + return Err(NamespaceError::Internal { + message: format!( + "Failed to delete version {} for table at '{}': {}", + v, table_uri, e + ), } + .into()); } } } @@ -2927,20 +2876,6 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { self.record_op("list_table_versions"); let branch = Self::normalized_branch(request.branch.as_deref())?; - // The manifest catalog has no branch concept, so a branch lists its own - // version chain from storage under its tree path instead. - if branch.is_none() - && self.table_version_storage_enabled - && let Some(ref manifest_ns) = self.manifest_ns - { - let table_id = request.id.clone().unwrap_or_default(); - let want_descending = request.descending == Some(true); - return manifest_ns - .list_table_versions(&table_id, want_descending, request.limit) - .await; - } - - // Fallback when table_version_storage is not enabled: list from _versions/ directory let table_uri = self.resolve_table_location(&request.id).await?; let table_uri = match branch { Some(b) => self.resolve_branch_location(&table_uri, b).await?, @@ -3087,43 +3022,6 @@ impl LanceNamespace for DirectoryNamespace { ); } - // Also record in __manifest (best-effort). Branches aren't tracked there, - // so for a branch the storage manifest above is the only record. - if branch.is_none() - && self.table_version_storage_enabled - && let Some(ref manifest_ns) = self.manifest_ns - { - let table_id_str = - manifest::ManifestNamespace::str_object_id(&request.id.clone().unwrap_or_default()); - let object_id = - manifest::ManifestNamespace::build_version_object_id(&table_id_str, version as i64); - let metadata_json = serde_json::json!({ - "manifest_path": final_path.to_string(), - "manifest_size": manifest_size, - "e_tag": final_meta.e_tag, - "naming_scheme": request.naming_scheme.as_deref().unwrap_or("V2"), - }) - .to_string(); - - if let Err(e) = manifest_ns - .insert_into_manifest_with_metadata( - vec![manifest::ManifestEntry { - object_id, - object_type: manifest::ObjectType::TableVersion, - location: None, - metadata: Some(metadata_json), - }], - None, - ) - .await - { - log::warn!( - "Failed to record table version in __manifest (best-effort): {:?}", - e - ); - } - } - Ok(CreateTableVersionResponse { transaction_id: None, version: Some(Box::new(TableVersion { @@ -3143,18 +3041,6 @@ impl LanceNamespace for DirectoryNamespace { ) -> Result { self.record_op("describe_table_version"); let branch = Self::normalized_branch(request.branch.as_deref())?; - // When table_version_storage_enabled and a specific version is requested, - // query from __manifest to avoid opening the entire dataset. A branch has - // no manifest-catalog entry, so it resolves from storage instead. - if branch.is_none() - && self.table_version_storage_enabled - && let (Some(manifest_ns), Some(version)) = (&self.manifest_ns, request.version) - { - let table_id = request.id.clone().unwrap_or_default(); - return manifest_ns.describe_table_version(&table_id, version).await; - } - - // Fallback when table_version_storage is not enabled: inspect physical manifests directly. let table_uri = self.resolve_table_location(&request.id).await?; let table_uri = match branch { Some(b) => self.resolve_branch_location(&table_uri, b).await?, @@ -3206,9 +3092,9 @@ impl LanceNamespace for DirectoryNamespace { .map(|r| (r.start_version, r.end_version)) .collect(); - // Reject pathological bounded ranges up front: the manifest path below - // builds one id per version, so (0, i64::MAX) would exhaust memory. A - // through-latest range (end < 0) is bounded by the manifests that exist. + // Reject pathological bounded ranges up front: an explicit huge bounded + // range like (0, i64::MAX) is almost certainly a mistake. A through-latest + // range (end < 0) is bounded by the manifests that actually exist on storage. const MAX_VERSIONS_PER_REQUEST: i128 = 1_000_000; let requested: i128 = ranges .iter() @@ -3235,72 +3121,8 @@ impl LanceNamespace for DirectoryNamespace { ranges, }]; - // Branches are not tracked in the manifest catalog, so a branch skips the - // __manifest phase entirely and deletes its physical manifests directly. - if branch.is_none() - && self.table_version_storage_enabled - && let Some(ref manifest_ns) = self.manifest_ns - { - // Through-latest ranges (end_version < 0) would require enumerating the - // __manifest chain up to the latest version, which is not wired up here. - // Reject rather than silently delete physical files while leaving the - // __manifest records in place. - if table_entries - .iter() - .any(|te| te.ranges.iter().any(|&(_, e)| e < 0)) - { - return Err(NamespaceError::Unsupported { - message: "through-latest delete (end_version < 0) is not supported \ - for managed-versioning tables" - .to_string(), - } - .into()); - } - - // Phase 1 (atomic commit point): Delete version records from __manifest - // for ALL tables in a single atomic copy-on-write rewrite. This is the - // authoritative source of truth — once __manifest entries are removed, - // the versions are logically deleted across all tables atomically. - // - // Request `ranges` carry an exclusive end (`[start, end)`); the manifest - // rewrite API matches an inclusive `[start, end]`, so shift the end down - // by one. Empty ranges collapse to start > end and are dropped downstream. - let table_ranges = table_entries - .iter() - .map(|te| { - let object_id = manifest::ManifestNamespace::str_object_id( - &te.table_id.clone().unwrap_or_default(), - ); - let inclusive_ranges = te - .ranges - .iter() - .map(|&(start, end)| (start, end - 1)) - .collect::>(); - (object_id, inclusive_ranges) - }) - .collect::>(); - let total_deleted_count = manifest_ns - .batch_delete_table_versions_by_ranges(&table_ranges) - .await?; - - // Phase 2: Delete physical manifest files (best-effort). - // Even if some file deletions fail, the versions are already removed from - // __manifest, so they won't be visible to readers. Leftover files are - // orphaned but harmless and can be cleaned up later. - let _ = self - .delete_physical_version_files(&table_entries, true, branch) - .await; - - return Ok(BatchDeleteTableVersionsResponse { - deleted_count: Some(total_deleted_count), - transaction_id: None, - }); - } - - // Direct path: delete physical files (no __manifest). Reached when storage - // tracking is off, or for any branch (which has no __manifest entries). let total_deleted_count = self - .delete_physical_version_files(&table_entries, false, branch) + .delete_physical_version_files(&table_entries, branch) .await?; Ok(BatchDeleteTableVersionsResponse { @@ -5376,7 +5198,6 @@ mod tests { DirectoryNamespaceBuilder::new(temp.to_str().unwrap()) .manifest_enabled(true) .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) .ops_metrics_enabled(true) .build() .await @@ -5751,150 +5572,12 @@ mod tests { ); } - /// The managed `__manifest` delete path (the authoritative catalog) must honor - /// the exclusive end: `[min, max)` removes exactly min..max from `__manifest`, - /// keeping max. With storage tracking on, the writes register versions in - /// `__manifest` and `list_table_versions` reads it back, so this exercises the - /// Phase-1 path that the physical-path tests never reach. - #[tokio::test] - async fn test_batch_delete_managed_manifest_exclusive() { - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; - use lance_namespace::models::{BatchDeleteTableVersionsRequest, VersionRange}; - - let temp = TempStdDir::default(); - let ns: Arc = Arc::new( - DirectoryNamespaceBuilder::new(temp.to_str().unwrap()) - .manifest_enabled(true) - .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) - .build() - .await - .unwrap(), - ); - let table_id = vec!["users".to_string()]; - let schema = Arc::new(ArrowSchema::new(vec![Field::new( - "id", - DataType::Int32, - false, - )])); - let batch = |seed: i32| { - arrow::record_batch::RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from(vec![seed]))], - ) - .unwrap() - }; - - // Register v1, v2, v3 in __manifest via the managed write flow. - let mut ds = Dataset::write_into_namespace( - RecordBatchIterator::new(vec![Ok(batch(1))], schema.clone()), - ns.clone(), - table_id.clone(), - Some(WriteParams { - mode: WriteMode::Create, - ..Default::default() - }), - ) - .await - .unwrap(); - ds.append( - RecordBatchIterator::new(vec![Ok(batch(2))], schema.clone()), - None, - ) - .await - .unwrap(); - ds.append( - RecordBatchIterator::new(vec![Ok(batch(3))], schema.clone()), - None, - ) - .await - .unwrap(); - - let before = ns - .list_table_versions(ListTableVersionsRequest { - id: Some(table_id.clone()), - ..Default::default() - }) - .await - .unwrap() - .versions; - assert!( - before.len() >= 3, - "expected v1..v3 tracked in __manifest: {:?}", - before - ); - let min_v = before.iter().map(|v| v.version).min().unwrap(); - let max_v = before.iter().map(|v| v.version).max().unwrap(); - - // [min, max): exclusive end keeps max. - ns.batch_delete_table_versions(BatchDeleteTableVersionsRequest { - id: Some(table_id.clone()), - ranges: vec![VersionRange::new(min_v, max_v)], - ..Default::default() - }) - .await - .unwrap(); - - let after = ns - .list_table_versions(ListTableVersionsRequest { - id: Some(table_id.clone()), - ..Default::default() - }) - .await - .unwrap() - .versions; - assert_eq!( - after.len(), - 1, - "only the exclusive end (max) should remain in __manifest: {:?}", - after - ); - assert_eq!(after[0].version, max_v, "max must be kept"); - } - - /// On the managed path, a through-latest delete (`end_version < 0`) is rejected - /// rather than silently deleting physical files while leaving `__manifest` - /// records in place. - #[tokio::test] - async fn test_batch_delete_managed_rejects_through_latest() { - use lance_namespace::models::{BatchDeleteTableVersionsRequest, VersionRange}; - - let temp = TempStdDir::default(); - let ns: Arc = Arc::new( - DirectoryNamespaceBuilder::new(temp.to_str().unwrap()) - .manifest_enabled(true) - .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) - .build() - .await - .unwrap(), - ); - - let err = ns - .batch_delete_table_versions(BatchDeleteTableVersionsRequest { - id: Some(vec!["users".to_string()]), - ranges: vec![VersionRange::new(0, -1)], - ..Default::default() - }) - .await; - assert!( - err.is_err(), - "through-latest delete must be rejected on the managed path" - ); - assert!( - err.unwrap_err().to_string().contains("not supported"), - "expected a not-supported error" - ); - } - /// Build a managed (manifest-tracked) namespace over `path`. async fn create_managed_namespace(path: &str) -> Arc { Arc::new( DirectoryNamespaceBuilder::new(path) .manifest_enabled(true) .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) .build() .await .unwrap(), @@ -6324,7 +6007,6 @@ mod tests { DirectoryNamespaceBuilder::new(temp.to_str().unwrap()) .manifest_enabled(true) .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) .ops_metrics_enabled(true) .build() .await @@ -6470,49 +6152,6 @@ mod tests { ); } - /// With the manifest store enabled, branch ops must still bypass the catalog - /// fast-path and read the chain from `tree//_versions/`. Without the - /// `branch.is_none()` guard this would query `__manifest` (which has no - /// branch entries) and return the wrong result. The other branch tests use a - /// store-disabled namespace, so this pins the enabled path specifically. - #[tokio::test] - async fn test_branch_ops_skip_manifest_store_when_enabled() { - let temp_dir = TempStdDir::default(); - let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) - .manifest_enabled(true) - .table_version_storage_enabled(true) - .build() - .await - .unwrap(); - - create_scalar_table(&namespace, "users").await; - create_branch_with_commits(&namespace, "users", "exp", 2).await; - - // list resolves the branch chain from storage despite storage tracking - // being on (a successful result with tree/exp paths proves the bypass: - // the catalog has no "exp" entry, so the fast-path would not return these). - let branch_versions = list_versions(&namespace, "users", Some("exp")) - .await - .unwrap(); - assert!(branch_versions.len() >= 2); - assert!( - branch_versions - .iter() - .all(|v| v.manifest_path.contains("tree/exp")), - "branch versions must come from branch storage with the store enabled: {:?}", - branch_versions - ); - - // describe likewise resolves from the branch's storage. - let req = DescribeTableVersionRequest { - id: Some(vec!["users".to_string()]), - branch: Some("exp".to_string()), - ..Default::default() - }; - let resp = namespace.describe_table_version(req).await.unwrap(); - assert!(resp.version.manifest_path.contains("tree/exp")); - } - #[tokio::test] async fn test_create_table() { let (namespace, _temp_dir) = create_test_namespace().await; @@ -11277,155 +10916,6 @@ mod tests { } } - /// Tests for multi-table transaction support via table_version_storage_enabled. - mod multi_table_transactions { - use super::*; - use futures::TryStreamExt; - use lance::dataset::builder::DatasetBuilder; - use lance_namespace::models::CreateTableVersionRequest; - - /// Helper to create a namespace with table_version_storage_enabled enabled - async fn create_managed_namespace(temp_path: &str) -> Arc { - Arc::new( - DirectoryNamespaceBuilder::new(temp_path) - .table_version_tracking_enabled(true) - .table_version_storage_enabled(true) - .manifest_enabled(true) - .build() - .await - .unwrap(), - ) - } - - /// Helper to create a table and get its staging manifest path - async fn create_table_and_get_staging( - namespace: Arc, - table_name: &str, - ) -> (Vec, object_store::path::Path) { - let schema = create_test_schema(); - let ipc_data = create_test_ipc_data(&schema); - let mut create_req = CreateTableRequest::new(); - create_req.id = Some(vec![table_name.to_string()]); - namespace - .create_table(create_req, bytes::Bytes::from(ipc_data)) - .await - .unwrap(); - - let table_id = vec![table_name.to_string()]; - let dataset = DatasetBuilder::from_namespace(namespace.clone(), table_id.clone()) - .await - .unwrap() - .load() - .await - .unwrap(); - - // Find existing manifest and create a staging copy - let versions_path = dataset.versions_dir(); - let manifest_metas: Vec<_> = dataset - .object_store(None) - .await - .unwrap() - .inner - .list(Some(&versions_path)) - .try_collect() - .await - .unwrap(); - - let manifest_meta = manifest_metas - .iter() - .find(|m| { - m.location - .filename() - .map(|f| f.ends_with(".manifest")) - .unwrap_or(false) - }) - .expect("No manifest file found"); - - let manifest_data = dataset - .object_store(None) - .await - .unwrap() - .inner - .get(&manifest_meta.location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - - let staging_path = dataset - .versions_dir() - .join(format!("staging_{}", table_name)); - dataset - .object_store(None) - .await - .unwrap() - .inner - .put(&staging_path, manifest_data.into()) - .await - .unwrap(); - - (table_id, staging_path) - } - - #[tokio::test] - async fn test_table_version_storage_enabled_requires_manifest() { - // table_version_storage_enabled=true requires manifest_enabled=true - let temp_dir = TempStdDir::default(); - let temp_path = temp_dir.to_str().unwrap(); - - let result = DirectoryNamespaceBuilder::new(temp_path) - .table_version_storage_enabled(true) - .manifest_enabled(false) - .build() - .await; - - assert!( - result.is_err(), - "Should fail when table_version_storage_enabled=true but manifest_enabled=false" - ); - } - - #[tokio::test] - async fn test_create_table_version_records_in_manifest() { - // When table_version_storage_enabled is enabled, single create_table_version - // should also record the version in __manifest - let temp_dir = TempStrDir::default(); - let temp_path: &str = &temp_dir; - - let namespace = create_managed_namespace(temp_path).await; - let ns: Arc = namespace.clone(); - - let (table_id, staging_path) = - create_table_and_get_staging(ns.clone(), "table_managed").await; - - // Create version 2 - let mut create_req = CreateTableVersionRequest::new(2, staging_path.to_string()); - create_req.id = Some(table_id.clone()); - create_req.naming_scheme = Some("V2".to_string()); - let response = namespace.create_table_version(create_req).await.unwrap(); - - assert!(response.version.is_some()); - let version = response.version.unwrap(); - assert_eq!(version.version, 2); - - // Verify the version is recorded in __manifest by querying it - let manifest_ns = namespace.manifest_ns.as_ref().unwrap(); - let table_id_str = manifest::ManifestNamespace::str_object_id(&table_id); - let versions = manifest_ns - .query_table_versions(&table_id_str, false, None) - .await - .unwrap(); - - assert!( - !versions.is_empty(), - "Version should be recorded in __manifest" - ); - let (ver, _path) = &versions[0]; - assert_eq!(*ver, 2, "Recorded version should be 2"); - } - } - #[tokio::test] async fn test_list_all_tables() { use lance_namespace::models::ListTablesRequest; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 067239b8765..9fc82bcb4c5 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -44,11 +44,10 @@ use lance_namespace::models::{ CreateNamespaceRequest, CreateNamespaceResponse, CreateTableRequest, CreateTableResponse, DeclareTableRequest, DeclareTableResponse, DeregisterTableRequest, DeregisterTableResponse, DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest, - DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest, - DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest, - ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, - NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest, - TableVersion, + DescribeTableResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, + DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, + ListTablesResponse, NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, + TableExistsRequest, }; use lance_namespace::schema::arrow_schema_to_json; use lance_table::feature_flags::apply_feature_flags; @@ -94,7 +93,6 @@ const MANIFEST_INDEX_BATCH_SIZE: usize = 8192; pub enum ObjectType { Namespace, Table, - TableVersion, } impl ObjectType { @@ -102,7 +100,6 @@ impl ObjectType { match self { Self::Namespace => "namespace", Self::Table => "table", - Self::TableVersion => "table_version", } } @@ -110,7 +107,6 @@ impl ObjectType { match s { "namespace" => Ok(Self::Namespace), "table" => Ok(Self::Table), - "table_version" => Ok(Self::TableVersion), _ => Err(NamespaceError::Internal { message: format!("Invalid object type: {}", s), } @@ -173,7 +169,7 @@ pub struct TableInfo { pub struct ManifestEntry { /// The unique object identifier (e.g., table name or version object_id) pub object_id: String, - /// The type of the object (Namespace, Table, or TableVersion) + /// The type of the object (Namespace or Table) pub object_type: ObjectType, /// The storage location (e.g., directory name for tables) pub location: Option, @@ -576,89 +572,6 @@ impl ManifestStreamMutation for DeleteObjectMutation { } } -enum DeleteTableVersionsTarget { - ObjectIds(HashSet), - Ranges(Vec), -} - -#[derive(Clone)] -struct DeleteTableVersionRangeTarget { - object_id_prefix: String, - ranges: Vec<(i64, i64)>, -} - -impl DeleteTableVersionRangeTarget { - fn matches(&self, object_id: &str) -> bool { - let Some(version) = object_id - .strip_prefix(&self.object_id_prefix) - .and_then(|suffix| suffix.parse::().ok()) - else { - return false; - }; - - self.ranges - .iter() - .any(|(start, end)| *start <= version && version <= *end) - } -} - -impl DeleteTableVersionsTarget { - fn matches(&self, object_id: &str) -> bool { - match self { - Self::ObjectIds(object_ids) => object_ids.contains(object_id), - Self::Ranges(targets) => targets.iter().any(|target| target.matches(object_id)), - } - } -} - -struct DeleteTableVersionsMutation { - target: DeleteTableVersionsTarget, - deleted_count: i64, -} - -impl ManifestStreamMutation for DeleteTableVersionsMutation { - type Output = i64; - - fn process_existing_row( - &mut self, - row: ManifestRowValue, - output: &mut ManifestBatchBuilder, - index_data: &mut ManifestIndexAccumulator, - ) -> Result<()> { - if row.object_type == ObjectType::TableVersion && self.target.matches(&row.object_id) { - self.deleted_count += 1; - return Ok(()); - } - - output.append( - index_data, - ManifestOutputRow { - object_id: &row.object_id, - object_type: row.object_type, - location: row.location.as_deref(), - metadata: row.metadata.as_deref(), - base_objects: row.base_objects.as_deref(), - }, - ) - } - - fn append_rows( - &mut self, - _output: &mut ManifestBatchBuilder, - _index_data: &mut ManifestIndexAccumulator, - ) -> Result<()> { - Ok(()) - } - - fn finish(&self) -> CopyOnWriteMutation { - if self.deleted_count > 0 { - CopyOnWriteMutation::updated(self.deleted_count) - } else { - CopyOnWriteMutation::unchanged(0) - } - } -} - /// Information about a namespace stored in the manifest #[derive(Debug, Clone)] pub struct NamespaceInfo { @@ -908,15 +821,10 @@ impl ManifestNamespace { dir_listing_enabled: bool, inline_optimization_enabled: bool, commit_retries: Option, - table_version_storage_enabled: bool, ) -> Result { - let manifest_dataset = Self::ensure_manifest_table_up_to_date( - &root, - &storage_options, - session.clone(), - table_version_storage_enabled, - ) - .await?; + let manifest_dataset = + Self::ensure_manifest_table_up_to_date(&root, &storage_options, session.clone()) + .await?; Ok(Self { root, @@ -980,60 +888,6 @@ impl ManifestNamespace { format!("table id '{}'", Self::str_object_id(table_id)) } - /// Format a version number as a zero-padded lexicographically sortable string. - /// - /// Versions are stored as 20-digit zero-padded integers (e.g., `00000000000000000001` - /// for version 1) so that string-based range queries and sorting work correctly. - pub fn format_table_version(version: i64) -> String { - format!("{:020}", version) - } - - /// Build the object_id for a table version entry. - /// - /// Format: `{table_object_id}${zero_padded_version}` - pub fn build_version_object_id(table_object_id: &str, version: i64) -> String { - format!( - "{}{}{}", - table_object_id, - DELIMITER, - Self::format_table_version(version) - ) - } - - fn build_version_object_id_prefix(table_object_id: &str) -> String { - format!("{}{}", table_object_id, DELIMITER) - } - - fn normalize_table_version_ranges(ranges: &[(i64, i64)]) -> Vec<(i64, i64)> { - let mut normalized = ranges - .iter() - .filter_map(|(start, end)| (*start <= *end).then_some((*start, *end))) - .collect::>(); - normalized.sort_unstable(); - - let mut merged: Vec<(i64, i64)> = Vec::with_capacity(normalized.len()); - for (start, end) in normalized { - let Some((_last_start, last_end)) = merged.last_mut() else { - merged.push((start, end)); - continue; - }; - if start <= *last_end + 1 { - *last_end = (*last_end).max(end); - continue; - } - merged.push((start, end)); - } - merged - } - - /// Parse a version number from the version suffix of a table version object_id. - /// - /// The object_id is formatted as `{table_id}${zero_padded_version}`. - pub fn parse_version_from_object_id(object_id: &str) -> Option { - let (_namespace, name) = Self::parse_object_id(object_id); - name.parse::().ok() - } - /// Generate a new directory name in format: `_` /// The hash is used to (1) optimize object store throughput, /// (2) have high enough entropy in a short period of time to prevent issues like @@ -2409,318 +2263,6 @@ impl ManifestNamespace { .await } - /// Query the manifest for all versions of a table, sorted by version. - /// - /// Returns a list of (version, metadata_json_string) tuples where metadata_json_string - /// contains the full metadata JSON stored in the manifest (manifest_path, manifest_size, - /// e_tag, naming_scheme). - /// - /// **Known limitation**: All matching rows are loaded into memory, sorted in Rust, - /// and then truncated. For tables with a very large number of versions this may be - /// expensive. Pushing sort/limit into the scan is not yet supported by Lance. - pub async fn query_table_versions( - &self, - object_id: &str, - descending: bool, - limit: Option, - ) -> Result> { - let escaped_id = object_id.replace('\'', "''"); - // table_version object_ids are formatted as "{object_id}${zero_padded_version}" - let filter = format!( - "object_type = 'table_version' AND starts_with(object_id, '{}{}')", - escaped_id, DELIMITER - ); - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["object_id", "metadata"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), - }) - })?; - let batches = Self::execute_scanner(scanner).await?; - - let mut versions: Vec<(i64, String)> = Vec::new(); - for batch in batches { - if batch.num_rows() == 0 { - continue; - } - let object_id_array = Self::get_string_column(&batch, "object_id")?; - let metadata_array = Self::get_string_column(&batch, "metadata")?; - for i in 0..batch.num_rows() { - let oid = object_id_array.value(i); - // Parse version from object_id - if let Some(version) = Self::parse_version_from_object_id(oid) { - let metadata_str = metadata_array.value(i).to_string(); - versions.push((version, metadata_str)); - } - } - } - - if descending { - versions.sort_by(|a, b| b.0.cmp(&a.0)); - } else { - versions.sort_by(|a, b| a.0.cmp(&b.0)); - } - - if let Some(limit) = limit { - versions.truncate(limit as usize); - } - - Ok(versions) - } - - /// Query the manifest for a specific version of a table. - /// - /// Returns the full metadata JSON string if found, which contains - /// manifest_path, manifest_size, e_tag, and naming_scheme. - /// - pub async fn query_table_version( - &self, - object_id: &str, - version: i64, - ) -> Result> { - let version_object_id = Self::build_version_object_id(object_id, version); - self.query_table_version_by_object_id(&version_object_id) - .await - } - - /// Query a specific table version by its exact object_id. - async fn query_table_version_by_object_id( - &self, - version_object_id: &str, - ) -> Result> { - let escaped_id = version_object_id.replace('\'', "''"); - let filter = format!( - "object_id = '{}' AND object_type = 'table_version'", - escaped_id - ); - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["metadata"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), - }) - })?; - let batches = Self::execute_scanner(scanner).await?; - - for batch in batches { - if batch.num_rows() == 0 { - continue; - } - let metadata_array = Self::get_string_column(&batch, "metadata")?; - return Ok(Some(metadata_array.value(0).to_string())); - } - - Ok(None) - } - - /// Delete table version entries from the manifest for a given table and version ranges. - /// - /// Each range is (start_version, end_version) inclusive. Deletes all matching - /// `object_type = 'table_version'` entries whose object_id matches - /// `{object_id}${zero_padded_version}`. - /// - /// Applies the ranges while streaming the manifest rewrite, without expanding - /// sparse ranges into every possible version object id. - pub async fn delete_table_versions( - &self, - object_id: &str, - ranges: &[(i64, i64)], - ) -> Result { - self.batch_delete_table_versions_by_ranges(&[(object_id.to_string(), ranges.to_vec())]) - .await - } - - /// Atomically delete table version entries from the manifest for multiple - /// tables and version ranges. - pub async fn batch_delete_table_versions_by_ranges( - &self, - table_ranges: &[(String, Vec<(i64, i64)>)], - ) -> Result { - let targets = table_ranges - .iter() - .filter_map(|(object_id, ranges)| { - let ranges = Self::normalize_table_version_ranges(ranges); - if ranges.is_empty() { - None - } else { - Some(DeleteTableVersionRangeTarget { - object_id_prefix: Self::build_version_object_id_prefix(object_id), - ranges, - }) - } - }) - .collect::>(); - if targets.is_empty() { - return Ok(0); - } - - self.rewrite_manifest("Failed to delete table versions from manifest", || { - DeleteTableVersionsMutation { - target: DeleteTableVersionsTarget::Ranges(targets.clone()), - deleted_count: 0, - } - }) - .await - } - - /// Atomically delete table version entries from the manifest by their object_ids. - /// - /// This method supports multi-table transactional deletion: all specified - /// object_ids (which may span multiple tables) are deleted in a single atomic - /// copy-on-write manifest rewrite. Either all entries are removed or none are. - /// - /// Object IDs are formatted as `{table_id}${version}`. - pub async fn batch_delete_table_versions_by_object_ids( - &self, - object_ids: &[String], - ) -> Result { - if object_ids.is_empty() { - return Ok(0); - } - - let object_ids = object_ids.iter().cloned().collect::>(); - self.rewrite_manifest("Failed to delete table versions from manifest", || { - DeleteTableVersionsMutation { - target: DeleteTableVersionsTarget::ObjectIds(object_ids.clone()), - deleted_count: 0, - } - }) - .await - } - - /// Set a property flag in the __manifest table's metadata key-value map. - /// - /// This uses `dataset.update_metadata()` to persist the flag in the - /// __manifest dataset's table metadata, rather than inserting a row. - /// If the property already exists with the same value, this is a no-op. - pub async fn set_property(&self, name: &str, value: &str) -> Result<()> { - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - if dataset_guard.metadata().get(name) == Some(&value.to_string()) { - return Ok(()); - } - drop(dataset_guard); - - let mut dataset_guard = self.manifest_dataset.get_mut().await?; - dataset_guard - .update_metadata([(name, value)]) - .await - .map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!( - "Failed to set property '{}' in __manifest metadata: {}", - name, e - ), - }) - })?; - Ok(()) - } - - /// Check if a property flag exists in the __manifest table's metadata key-value map. - pub async fn has_property(&self, name: &str) -> Result { - let dataset_guard = self.manifest_dataset.get().await?; - Ok(dataset_guard.metadata().contains_key(name)) - } - - /// Parse metadata JSON into a `TableVersion`. - /// - /// Returns `None` if metadata is invalid or missing required fields. - fn parse_table_version(version: i64, metadata_str: &str) -> Option { - let meta: serde_json::Value = match serde_json::from_str(metadata_str) { - Ok(v) => v, - Err(e) => { - log::warn!( - "Skipping version {} due to invalid metadata JSON: {}", - version, - e - ); - return None; - } - }; - let manifest_path = match meta.get("manifest_path").and_then(|v| v.as_str()) { - Some(p) => p.to_string(), - None => { - log::warn!( - "Skipping version {} due to missing 'manifest_path' in metadata — \ - this may indicate data corruption", - version - ); - return None; - } - }; - let manifest_size = meta.get("manifest_size").and_then(|v| v.as_i64()); - let e_tag = meta - .get("e_tag") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - Some(TableVersion { - version, - manifest_path, - manifest_size, - e_tag, - timestamp_millis: None, - metadata: None, - }) - } - - /// List table versions from the __manifest table. - /// - /// Queries the manifest for all versions of the given table and returns - /// them as a `ListTableVersionsResponse`. - pub async fn list_table_versions( - &self, - table_id: &[String], - descending: bool, - limit: Option, - ) -> Result { - let object_id = Self::str_object_id(table_id); - let manifest_versions = self - .query_table_versions(&object_id, descending, limit) - .await?; - - let table_versions: Vec = manifest_versions - .into_iter() - .filter_map(|(version, metadata_str)| Self::parse_table_version(version, &metadata_str)) - .collect(); - - Ok(ListTableVersionsResponse { - versions: table_versions, - page_token: None, - }) - } - - /// Describe a specific table version from the __manifest table. - /// - /// Queries the manifest for a specific version and returns it as a - /// `DescribeTableVersionResponse`. Returns an error if the version is not found. - pub async fn describe_table_version( - &self, - table_id: &[String], - version: i64, - ) -> Result { - let object_id = Self::str_object_id(table_id); - if let Some(metadata_str) = self.query_table_version(&object_id, version).await? - && let Some(tv) = Self::parse_table_version(version, &metadata_str) - { - return Ok(DescribeTableVersionResponse { - version: Box::new(tv), - }); - } - Err(NamespaceError::TableVersionNotFound { - message: format!("version {} for table {:?}", version, table_id), - } - .into()) - } - /// Register a table in the manifest without creating the physical table (internal helper for migration) pub async fn register_table(&self, name: &str, location: String) -> Result<()> { let object_id = Self::build_object_id(&[], name); @@ -2825,12 +2367,10 @@ impl ManifestNamespace { /// 1. Try to load an existing manifest table /// 2. If it exists, check and migrate the schema if needed (e.g., add primary key metadata) /// 3. If it doesn't exist, create a new manifest table with the current schema - /// 4. Persist feature flags (e.g., table_version_storage_enabled) if requested async fn ensure_manifest_table_up_to_date( root: &str, storage_options: &Option>, session: Option>, - table_version_storage_enabled: bool, ) -> Result { let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); log::debug!("Attempting to load manifest from {}", manifest_path); @@ -2885,27 +2425,6 @@ impl ManifestNamespace { })?; } - // Persist table_version_storage_enabled flag in __manifest so that once - // enabled, it becomes a permanent property of this namespace. - if table_version_storage_enabled { - let needs_flag = dataset - .metadata() - .get("table_version_storage_enabled") - .map(|v| v != "true") - .unwrap_or(true); - - if needs_flag - && let Err(e) = dataset - .update_metadata([("table_version_storage_enabled", "true")]) - .await - { - log::warn!( - "Failed to persist table_version_storage_enabled flag in __manifest: {:?}", - e - ); - } - } - Ok(DatasetConsistencyWrapper::new(dataset)) } else { log::info!("Creating new manifest table at {}", manifest_path); @@ -4053,7 +3572,6 @@ mod tests { true, inline_optimization_enabled, commit_retries, - false, ) .await .unwrap() @@ -4410,90 +3928,6 @@ mod tests { ); } - #[tokio::test] - async fn test_manifest_delete_table_versions_by_ranges() { - let temp_dir = TempStdDir::default(); - let temp_path = temp_dir.to_str().unwrap(); - let manifest_ns = create_manifest_namespace(temp_path, false).await; - let table_id = "table"; - let entries = (1..=5) - .map(|version| ManifestEntry { - object_id: ManifestNamespace::build_version_object_id(table_id, version), - object_type: ObjectType::TableVersion, - location: None, - metadata: Some( - serde_json::json!({ - "manifest_path": format!("_versions/{}.manifest", version), - }) - .to_string(), - ), - }) - .collect::>(); - manifest_ns - .insert_into_manifest_with_metadata(entries, None) - .await - .unwrap(); - - let deleted = manifest_ns - .delete_table_versions(table_id, &[(2, 3), (5, 5)]) - .await - .unwrap(); - assert_eq!(deleted, 3); - - let remaining = manifest_ns - .query_table_versions(table_id, false, None) - .await - .unwrap() - .into_iter() - .map(|(version, _)| version) - .collect::>(); - assert_eq!(remaining, vec![1, 4]); - } - - #[tokio::test] - async fn test_manifest_delete_table_versions_by_object_ids() { - let temp_dir = TempStdDir::default(); - let temp_path = temp_dir.to_str().unwrap(); - let manifest_ns = create_manifest_namespace(temp_path, false).await; - let table_id = "table"; - let entries = (1..=3) - .map(|version| ManifestEntry { - object_id: ManifestNamespace::build_version_object_id(table_id, version), - object_type: ObjectType::TableVersion, - location: None, - metadata: Some( - serde_json::json!({ - "manifest_path": format!("_versions/{}.manifest", version), - }) - .to_string(), - ), - }) - .collect::>(); - manifest_ns - .insert_into_manifest_with_metadata(entries, None) - .await - .unwrap(); - - let object_ids = vec![ - ManifestNamespace::build_version_object_id(table_id, 1), - ManifestNamespace::build_version_object_id(table_id, 3), - ]; - let deleted = manifest_ns - .batch_delete_table_versions_by_object_ids(&object_ids) - .await - .unwrap(); - assert_eq!(deleted, 2); - - let remaining = manifest_ns - .query_table_versions(table_id, false, None) - .await - .unwrap() - .into_iter() - .map(|(version, _)| version) - .collect::>(); - assert_eq!(remaining, vec![2]); - } - #[tokio::test] async fn test_manifest_noop_delete_uses_latest_snapshot() { let temp_dir = TempStdDir::default(); diff --git a/rust/lance-namespace-impls/src/rest_adapter.rs b/rust/lance-namespace-impls/src/rest_adapter.rs index 7324ab0bb0e..44ebd866810 100644 --- a/rust/lance-namespace-impls/src/rest_adapter.rs +++ b/rust/lance-namespace-impls/src/rest_adapter.rs @@ -1527,8 +1527,7 @@ mod tests { } /// Like [`Self::new`], with managed versioning (table version - /// tracking through the `__manifest` catalog) enabled on the - /// backend. + /// tracking) enabled on the backend. async fn new_managed() -> Self { Self::build(true).await } @@ -1540,9 +1539,7 @@ mod tests { // Create DirectoryNamespace backend with manifest enabled let mut builder = DirectoryNamespaceBuilder::new(&temp_path).manifest_enabled(true); if managed_versioning { - builder = builder - .table_version_tracking_enabled(true) - .table_version_storage_enabled(true); + builder = builder.table_version_tracking_enabled(true); } let backend = builder.build().await.unwrap(); let backend = Arc::new(backend);