From 09f52ddbf35d25b30dde66248e696cea452c0a18 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 9 Jun 2026 02:26:00 -0700 Subject: [PATCH 1/3] perf(dir-catalog): rewrite manifest mutations with copy-on-write --- Cargo.lock | 4 + python/Cargo.lock | 4 + rust/lance-namespace-datafusion/tests/sql.rs | 2 + rust/lance-namespace-impls/Cargo.toml | 4 + rust/lance-namespace-impls/src/dir.rs | 64 +- .../lance-namespace-impls/src/dir/manifest.rs | 2695 ++++++++++++++--- 6 files changed, 2291 insertions(+), 482 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 866eb9b4b0e..75b7f902d7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5028,6 +5028,8 @@ dependencies = [ "base64 0.22.1", "bytes", "chrono", + "datafusion-common", + "datafusion-physical-plan", "futures", "hmac 0.12.1", "lance", @@ -5045,6 +5047,7 @@ dependencies = [ "rand 0.9.4", "reqwest 0.12.28", "ring", + "roaring", "rstest", "rustls-pki-types", "serde", @@ -5055,6 +5058,7 @@ dependencies = [ "tower", "tower-http 0.5.2", "url", + "uuid", "wiremock", ] diff --git a/python/Cargo.lock b/python/Cargo.lock index 879195811cf..5a6fb26be91 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4560,6 +4560,8 @@ dependencies = [ "async-trait", "axum", "bytes", + "datafusion-common", + "datafusion-physical-plan", "futures", "lance", "lance-core", @@ -4572,12 +4574,14 @@ dependencies = [ "object_store", "rand 0.9.4", "reqwest 0.12.28", + "roaring", "serde", "serde_json", "tokio", "tower", "tower-http 0.5.2", "url", + "uuid", ] [[package]] diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs index e49cd7e58e3..5332e831cb6 100755 --- a/rust/lance-namespace-datafusion/tests/sql.rs +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +#![recursion_limit = "256"] + use std::sync::Arc; use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index 53ff79fb333..56b8a5864ea 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -51,6 +51,8 @@ object_store = { workspace = true } arrow = { workspace = true } arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +datafusion-common = { workspace = true } +datafusion-physical-plan = { workspace = true } # REST adapter implementation dependencies (optional, enabled by "rest-adapter" feature) axum = { workspace = true, optional = true } @@ -66,6 +68,8 @@ serde_json = { workspace = true } futures.workspace = true log.workspace = true rand.workspace = true +roaring.workspace = true +uuid.workspace = true # Shared credential vending dependencies sha2 = { version = "0.10", optional = true } diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 053b339a4f3..c88878369cf 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -313,11 +313,10 @@ impl DirectoryNamespaceBuilder { self } - /// Enable or disable inline optimization of the __manifest table. + /// Enable or disable replacement index maintenance for the __manifest table. /// - /// When enabled (default), performs compaction and indexing on the __manifest table - /// after every write operation to maintain optimal performance. - /// When disabled, manual optimization must be performed separately. + /// When enabled (default), copy-on-write manifest rewrites build replacement indices + /// for fast reads. When disabled, rewrites only replace data files. pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self { self.inline_optimization_enabled = enabled; self @@ -355,7 +354,7 @@ impl DirectoryNamespaceBuilder { /// - `root`: The root directory path (required) /// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true) /// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true) - /// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true) + /// - `inline_optimization_enabled`: Enable replacement indices on __manifest rewrites (optional, default: true) /// - `storage.*`: Storage options (optional, prefix will be stripped) /// /// Credential vendor properties (prefixed with `credential_vendor.`, prefix is stripped): @@ -2143,6 +2142,7 @@ impl DirectoryNamespace { /// to the manifest to enable manifest-only mode: /// /// ```no_run + /// #![recursion_limit = "256"] /// # use lance_namespace_impls::DirectoryNamespaceBuilder; /// # async fn example() -> Result<(), Box> { /// // Create namespace with dual mode (manifest + directory listing) @@ -3235,8 +3235,6 @@ impl LanceNamespace for DirectoryNamespace { ranges, }]; - let mut total_deleted_count = 0i64; - // Branches are not tracked in the manifest catalog, so a branch skips the // __manifest phase entirely and deletes its physical manifests directly. if branch.is_none() @@ -3260,32 +3258,30 @@ impl LanceNamespace for DirectoryNamespace { } // Phase 1 (atomic commit point): Delete version records from __manifest - // for ALL tables in a single atomic operation. This is the authoritative - // source of truth — once __manifest entries are removed, the versions - // are logically deleted across all tables atomically. - - // Collect all (table_id_str, ranges) for batch deletion - let mut all_object_ids: Vec = Vec::new(); - for te in &table_entries { - let table_id_str = manifest::ManifestNamespace::str_object_id( - &te.table_id.clone().unwrap_or_default(), - ); - for (start, end) in &te.ranges { - for version in *start..*end { - let object_id = manifest::ManifestNamespace::build_version_object_id( - &table_id_str, - version, - ); - all_object_ids.push(object_id); - } - } - } - - if !all_object_ids.is_empty() { - total_deleted_count = manifest_ns - .batch_delete_table_versions_by_object_ids(&all_object_ids) - .await?; - } + // for ALL tables in a single atomic copy-on-write rewrite. This is the + // authoritative source of truth — once __manifest entries are removed, + // the versions are logically deleted across all tables atomically. + // + // Request `ranges` carry an exclusive end (`[start, end)`); the manifest + // rewrite API matches an inclusive `[start, end]`, so shift the end down + // by one. Empty ranges collapse to start > end and are dropped downstream. + let table_ranges = table_entries + .iter() + .map(|te| { + let object_id = manifest::ManifestNamespace::str_object_id( + &te.table_id.clone().unwrap_or_default(), + ); + let inclusive_ranges = te + .ranges + .iter() + .map(|&(start, end)| (start, end - 1)) + .collect::>(); + (object_id, inclusive_ranges) + }) + .collect::>(); + let total_deleted_count = manifest_ns + .batch_delete_table_versions_by_ranges(&table_ranges) + .await?; // Phase 2: Delete physical manifest files (best-effort). // Even if some file deletions fail, the versions are already removed from @@ -3303,7 +3299,7 @@ impl LanceNamespace for DirectoryNamespace { // Direct path: delete physical files (no __manifest). Reached when storage // tracking is off, or for any branch (which has no __manifest entries). - total_deleted_count = self + let total_deleted_count = self .delete_physical_version_files(&table_entries, false, branch) .await?; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 0e22f1e8b69..a86f5209c6e 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -7,27 +7,37 @@ //! to track tables and nested namespaces. use arrow::array::builder::{ListBuilder, StringBuilder}; -use arrow::array::{Array, RecordBatch, RecordBatchIterator, StringArray}; -use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use arrow::array::{Array, ListArray, RecordBatch, RecordBatchIterator, StringArray, UInt64Array}; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; -use futures::{FutureExt, TryStreamExt, stream::StreamExt}; -use lance::dataset::optimize::{CompactionOptions, compact_files}; +use datafusion_common::DataFusionError; +use datafusion_physical_plan::{ + SendableRecordBatchStream, + stream::RecordBatchStreamAdapter as DatafusionRecordBatchStreamAdapter, +}; +use futures::{ + FutureExt, TryStreamExt, + stream::{self, StreamExt}, +}; +use lance::dataset::index::LanceIndexStoreExt; +use lance::dataset::transaction::{Operation, Transaction}; use lance::dataset::{ - DeleteBuilder, MergeInsertBuilder, ReadParams, WhenMatched, WhenNotMatched, WriteMode, - WriteParams, builder::DatasetBuilder, + InsertBuilder, ReadParams, WhenMatched, WriteMode, WriteParams, builder::DatasetBuilder, }; -use lance::index::DatasetIndexExt; use lance::session::Session; use lance::{Dataset, dataset::scanner::Scanner}; use lance_core::Error as LanceError; use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; -use lance_core::{Error, Result}; -use lance_index::IndexType; -use lance_index::optimize::OptimizeOptions; -use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; +use lance_core::{Error, ROW_ID, Result}; +use lance_index::progress::noop_progress; +use lance_index::registry::IndexPluginRegistry; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::registry::VALUE_COLUMN_NAME; +use lance_index::scalar::{BuiltinIndexType, CreatedIndex, ScalarIndexParams}; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; +use lance_io::stream::RecordBatchStream as LanceRecordBatchStream; use lance_namespace::LanceNamespace; use lance_namespace::error::NamespaceError; use lance_namespace::models::{ @@ -41,17 +51,27 @@ use lance_namespace::models::{ TableVersion, }; use lance_namespace::schema::arrow_schema_to_json; +use lance_table::feature_flags::apply_feature_flags; +use lance_table::format::{Fragment, IndexMetadata, Manifest}; +use lance_table::io::commit::{ + CommitError, CommitHandler, commit_handler_from_url, write_manifest_file_to_path, +}; use object_store::{Error as ObjectStoreError, path::Path}; +use roaring::RoaringBitmap; use std::io::Cursor; +use std::time::{SystemTime, UNIX_EPOCH}; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Mutex as StdMutex, MutexGuard as StdMutexGuard}, }; use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use uuid::Uuid; const MANIFEST_TABLE_NAME: &str = "__manifest"; +const LANCE_DATA_DIR: &str = "data"; +const LANCE_INDICES_DIR: &str = "_indices"; const DELIMITER: &str = "$"; /// Bounded concurrency for per-table `_versions/` probes when filtering declared tables. /// Higher values reduce latency but increase burst load against the object store. @@ -64,9 +84,10 @@ const OBJECT_ID_INDEX_NAME: &str = "object_id_btree"; const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap"; /// LabelList index on the base_objects column for view dependencies const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list"; -/// Inline maintenance on the manifest table is expensive relative to a single-row mutation. -/// Wait until enough fragments accumulate before compacting files or merging indices. -const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8; +// Each retry reloads and rewrites the full manifest. Match the regular Lance +// commit retry budget so multi-process namespace writes can make progress. +const DEFAULT_MANIFEST_REWRITE_COMMIT_RETRIES: u32 = 20; +const MANIFEST_INDEX_BATCH_SIZE: usize = 8192; /// Object types that can be stored in the manifest #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -77,7 +98,7 @@ pub enum ObjectType { } impl ObjectType { - pub fn as_str(&self) -> &str { + pub fn as_str(&self) -> &'static str { match self { Self::Namespace => "namespace", Self::Table => "table", @@ -160,6 +181,484 @@ pub struct ManifestEntry { pub metadata: Option, } +struct CopyOnWriteMutation { + result: T, + has_changes: bool, +} + +impl CopyOnWriteMutation { + fn updated(result: T) -> Self { + Self { + result, + has_changes: true, + } + } + + fn unchanged(result: T) -> Self { + Self { + result, + has_changes: false, + } + } +} + +struct ManifestIndexBuildInput { + index_name: &'static str, + column_name: &'static str, + params: ScalarIndexParams, + field: Field, + stream: SendableRecordBatchStream, +} + +struct ManifestTrainedIndex { + index_name: &'static str, + column_name: &'static str, + uuid: Uuid, + created_index: CreatedIndex, +} + +struct ManifestRowValue { + object_id: String, + object_type: ObjectType, + location: Option, + metadata: Option, + base_objects: Option>, +} + +struct ManifestOutputRow<'a> { + object_id: &'a str, + object_type: ObjectType, + location: Option<&'a str>, + metadata: Option<&'a str>, + base_objects: Option<&'a [String]>, +} + +#[derive(Default)] +struct ManifestIndexAccumulator { + object_ids: BTreeMap, u64>, + object_types: BTreeMap<&'static str, RoaringBitmap>, + base_objects_values: Vec>>, + base_objects_row_ids: Vec, + row_count: u64, +} + +impl ManifestIndexAccumulator { + fn next_row_id(&self) -> Result { + if self.row_count >= u64::from(u32::MAX) { + return Err(NamespaceError::Internal { + message: format!( + "Manifest rewrite exceeded maximum single-fragment row count: {}", + self.row_count + ), + } + .into()); + } + Ok(self.row_count) + } + + fn push(&mut self, row: &ManifestOutputRow<'_>) -> Result { + let row_id = self.next_row_id()?; + if self + .object_ids + .insert(Arc::::from(row.object_id), row_id) + .is_some() + { + return Err(NamespaceError::Internal { + message: format!("Manifest contains duplicate object_id '{}'", row.object_id), + } + .into()); + } + self.object_types + .entry(row.object_type.as_str()) + .or_default() + .insert(row_id as u32); + self.base_objects_values + .push(row.base_objects.map(|objects| objects.to_vec())); + self.base_objects_row_ids.push(row_id); + self.row_count += 1; + Ok(row_id) + } +} + +struct ManifestBatchBuilder { + object_ids: Vec, + object_types: Vec<&'static str>, + locations: Vec>, + metadatas: Vec>, + base_objects: Vec>>, +} + +impl ManifestBatchBuilder { + fn new() -> Self { + Self { + object_ids: Vec::new(), + object_types: Vec::new(), + locations: Vec::new(), + metadatas: Vec::new(), + base_objects: Vec::new(), + } + } + + fn is_empty(&self) -> bool { + self.object_ids.is_empty() + } + + fn append( + &mut self, + index_data: &mut ManifestIndexAccumulator, + row: ManifestOutputRow<'_>, + ) -> Result<()> { + index_data.push(&row)?; + self.object_ids.push(row.object_id.to_string()); + self.object_types.push(row.object_type.as_str()); + self.locations.push(row.location.map(ToString::to_string)); + self.metadatas.push(row.metadata.map(ToString::to_string)); + self.base_objects + .push(row.base_objects.map(|objects| objects.to_vec())); + Ok(()) + } + + fn finish(self) -> Result { + let base_objects_array = ManifestNamespace::base_objects_array(&self.base_objects); + RecordBatch::try_new( + ManifestNamespace::manifest_schema(), + vec![ + Arc::new(StringArray::from(self.object_ids)), + Arc::new(StringArray::from(self.object_types)), + Arc::new(StringArray::from(self.locations)), + Arc::new(StringArray::from(self.metadatas)), + Arc::new(base_objects_array), + ], + ) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to create manifest snapshot batch: {:?}", e), + }) + }) + } +} + +/// How to resolve a storage commit conflict (or an ambiguous commit error that did +/// not land) against the latest catalog state, without re-staging the full rewrite. +enum ConflictResolution { + /// Re-read the latest manifest and re-apply the mutation (upserts, version-range + /// deletes). The staged data/index files are discarded and a new rewrite is attempted. + Retry, + /// Creating these object ids with fail-on-conflict semantics. If any of them now + /// exists in the latest manifest, the create lost the race and must fail with a + /// concurrent-modification error; otherwise retry the rewrite. + FailIfExists(Vec), + /// Deleting `object_id`. If it is already absent from the latest manifest the delete + /// has effectively happened, so return `output` as success; otherwise retry. + SucceedIfAbsent { object_id: String, output: O }, +} + +trait ManifestStreamMutation: Send { + type Output: Clone + Send + 'static; + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()>; + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()>; + + fn finish(&self) -> CopyOnWriteMutation; + + /// Declares how a storage commit conflict should be resolved against the latest + /// committed catalog state. Defaults to re-reading and re-applying. + fn conflict_resolution(&self) -> ConflictResolution { + ConflictResolution::Retry + } +} + +struct ManifestRewriteShared { + mutation: M, + index_data: Option, + result: Option>, + error: Option, +} + +impl ManifestRewriteShared { + fn new(mutation: M) -> Self { + Self { + mutation, + index_data: Some(ManifestIndexAccumulator::default()), + result: None, + error: None, + } + } +} + +struct UpsertManifestMutation { + entries: Vec, + base_objects: Vec>>, + entry_positions: HashMap, + matched: Vec, + when_matched: WhenMatched, +} + +impl UpsertManifestMutation { + fn new( + entries: Vec, + base_objects: Option>, + when_matched: WhenMatched, + ) -> Self { + let entry_positions = entries + .iter() + .enumerate() + .map(|(index, entry)| (entry.object_id.clone(), index)) + .collect(); + let matched = vec![false; entries.len()]; + let mut entry_base_objects = vec![None; entries.len()]; + if !entry_base_objects.is_empty() { + entry_base_objects[0] = base_objects; + } + Self { + entries, + base_objects: entry_base_objects, + entry_positions, + matched, + when_matched, + } + } + + fn entry_row(&self, index: usize) -> ManifestOutputRow<'_> { + let entry = &self.entries[index]; + ManifestOutputRow { + object_id: &entry.object_id, + object_type: entry.object_type, + location: entry.location.as_deref(), + metadata: entry.metadata.as_deref(), + base_objects: self.base_objects[index].as_deref(), + } + } +} + +impl ManifestStreamMutation for UpsertManifestMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if let Some(index) = self.entry_positions.get(&row.object_id).copied() { + match self.when_matched { + WhenMatched::Fail => { + return Err(NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently created by another operation", + row.object_id + ), + } + .into()); + } + WhenMatched::UpdateAll => { + self.matched[index] = true; + output.append(index_data, self.entry_row(index))?; + return Ok(()); + } + _ => { + return Err(NamespaceError::Internal { + message: format!( + "Unsupported manifest rewrite matched action: {:?}", + self.when_matched + ), + } + .into()); + } + } + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + base_objects: row.base_objects.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + for index in 0..self.entries.len() { + if !self.matched[index] { + output.append(index_data, self.entry_row(index))?; + } + } + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + CopyOnWriteMutation::updated(()) + } + + fn conflict_resolution(&self) -> ConflictResolution { + match self.when_matched { + // Fail-on-conflict create: a concurrent writer may have created one of these + // ids. Re-applying would still fail, so check directly instead of re-staging. + WhenMatched::Fail => ConflictResolution::FailIfExists( + self.entries.iter().map(|e| e.object_id.clone()).collect(), + ), + // Metadata upsert is last-writer-wins: re-read and re-apply. + _ => ConflictResolution::Retry, + } + } +} + +struct DeleteObjectMutation { + object_id: String, + deleted: bool, +} + +impl ManifestStreamMutation for DeleteObjectMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if row.object_id == self.object_id { + self.deleted = true; + return Ok(()); + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + base_objects: row.base_objects.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + _output: &mut ManifestBatchBuilder, + _index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + if self.deleted { + CopyOnWriteMutation::updated(()) + } else { + CopyOnWriteMutation::unchanged(()) + } + } + + fn conflict_resolution(&self) -> ConflictResolution { + // If a concurrent writer already removed the object, the delete is satisfied. + ConflictResolution::SucceedIfAbsent { + object_id: self.object_id.clone(), + output: (), + } + } +} + +enum DeleteTableVersionsTarget { + ObjectIds(HashSet), + Ranges(Vec), +} + +#[derive(Clone)] +struct DeleteTableVersionRangeTarget { + object_id_prefix: String, + ranges: Vec<(i64, i64)>, +} + +impl DeleteTableVersionRangeTarget { + fn matches(&self, object_id: &str) -> bool { + let Some(version) = object_id + .strip_prefix(&self.object_id_prefix) + .and_then(|suffix| suffix.parse::().ok()) + else { + return false; + }; + + self.ranges + .iter() + .any(|(start, end)| *start <= version && version <= *end) + } +} + +impl DeleteTableVersionsTarget { + fn matches(&self, object_id: &str) -> bool { + match self { + Self::ObjectIds(object_ids) => object_ids.contains(object_id), + Self::Ranges(targets) => targets.iter().any(|target| target.matches(object_id)), + } + } +} + +struct DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget, + deleted_count: i64, +} + +impl ManifestStreamMutation for DeleteTableVersionsMutation { + type Output = i64; + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if row.object_type == ObjectType::TableVersion && self.target.matches(&row.object_id) { + self.deleted_count += 1; + return Ok(()); + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + base_objects: row.base_objects.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + _output: &mut ManifestBatchBuilder, + _index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + if self.deleted_count > 0 { + CopyOnWriteMutation::updated(self.deleted_count) + } else { + CopyOnWriteMutation::unchanged(0) + } + } +} + /// Information about a namespace stored in the manifest #[derive(Debug, Clone)] pub struct NamespaceInfo { @@ -190,6 +689,14 @@ impl DatasetConsistencyWrapper { }) } + /// Reload the dataset and return a reference. + pub async fn get_refreshed(&self) -> Result> { + self.reload().await?; + Ok(DatasetReadGuard { + guard: self.0.read().await, + }) + } + /// Get a mutable reference to the dataset. /// Always reloads to ensure strong consistency. pub async fn get_mut(&self) -> Result> { @@ -306,8 +813,8 @@ pub struct ManifestNamespace { /// If true, root namespace tables use {table_name}.lance naming /// If false, they use namespace-prefixed names dir_listing_enabled: bool, - /// Whether to perform inline optimization (compaction and indexing) on the __manifest table - /// after every write. Defaults to true. + /// Whether copy-on-write manifest rewrites should build replacement indices. + /// Defaults to true. inline_optimization_enabled: bool, /// Number of retries for commit operations on the manifest table. /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20). @@ -493,6 +1000,32 @@ impl ManifestNamespace { ) } + fn build_version_object_id_prefix(table_object_id: &str) -> String { + format!("{}{}", table_object_id, DELIMITER) + } + + fn normalize_table_version_ranges(ranges: &[(i64, i64)]) -> Vec<(i64, i64)> { + let mut normalized = ranges + .iter() + .filter_map(|(start, end)| (*start <= *end).then_some((*start, *end))) + .collect::>(); + normalized.sort_unstable(); + + let mut merged: Vec<(i64, i64)> = Vec::with_capacity(normalized.len()); + for (start, end) in normalized { + let Some((_last_start, last_end)) = merged.last_mut() else { + merged.push((start, end)); + continue; + }; + if start <= *last_end + 1 { + *last_end = (*last_end).max(end); + continue; + } + merged.push((start, end)); + } + merged + } + /// Parse a version number from the version suffix of a table version object_id. /// /// The object_id is formatted as `{table_id}${zero_padded_version}`. @@ -556,165 +1089,389 @@ impl ManifestNamespace { Ok(full_url.to_string()) } - /// Perform inline optimization on the __manifest table. - /// - /// This method: - /// 1. Creates three indexes on the manifest table: - /// - BTREE index on object_id for fast lookups - /// - Bitmap index on object_type for filtering by type - /// - LabelList index on base_objects for view dependencies - /// 2. Runs file compaction to merge small files - /// 3. Optimizes existing indices - /// - /// This is called automatically after writes when inline_optimization_enabled is true. - async fn run_inline_optimization(&self) -> Result<()> { - if !self.inline_optimization_enabled { - return Ok(()); + fn string_list_array(values: &[Option>], child_name: &str) -> ListArray { + let string_builder = StringBuilder::new(); + let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new( + child_name, + DataType::Utf8, + true, + ))); + for value in values { + match value { + Some(objects) => { + for object in objects { + list_builder.values().append_value(object); + } + list_builder.append(true); + } + None => list_builder.append_null(), + } } + list_builder.finish() + } - // Get a mutable reference to the dataset to perform optimization - let mut dataset_guard = self.manifest_dataset.get_mut().await?; - let dataset: &mut Dataset = &mut dataset_guard; + fn base_objects_array(values: &[Option>]) -> ListArray { + Self::string_list_array(values, "object_id") + } - // Step 1: Create indexes if they don't already exist - let indices = dataset.load_indices().await?; + fn value_row_id_schema(value_field: Field) -> SchemaRef { + Arc::new(ArrowSchema::new(vec![ + value_field, + Field::new(ROW_ID, DataType::UInt64, false), + ])) + } - // Check which indexes already exist - let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME); - let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME); - let has_base_objects_index = indices - .iter() - .any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME); + fn string_row_id_batch( + schema: SchemaRef, + values: Vec, + row_ids: Vec, + ) -> Result { + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(values)), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .map_err(Into::into) + } - // Create BTREE index on object_id - if !has_object_id_index { - log::debug!( - "Creating BTREE index '{}' on object_id for __manifest table", - OBJECT_ID_INDEX_NAME - ); - let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree); - if let Err(e) = dataset - .create_index( - &["object_id"], - IndexType::BTree, - Some(OBJECT_ID_INDEX_NAME.to_string()), - ¶ms, - true, - ) - .await - { - log::warn!( - "Failed to create BTREE index on object_id for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created BTREE index '{}' on object_id for __manifest table", - OBJECT_ID_INDEX_NAME - ); - } - } + fn list_row_id_batch( + schema: SchemaRef, + values: Vec>>, + row_ids: Vec, + ) -> Result { + RecordBatch::try_new( + schema, + vec![ + Arc::new(Self::string_list_array(&values, "item")), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .map_err(Into::into) + } - // Create Bitmap index on object_type - if !has_object_type_index { - log::debug!( - "Creating Bitmap index '{}' on object_type for __manifest table", - OBJECT_TYPE_INDEX_NAME - ); - let params = ScalarIndexParams::default(); - if let Err(e) = dataset - .create_index( - &["object_type"], - IndexType::Bitmap, - Some(OBJECT_TYPE_INDEX_NAME.to_string()), - ¶ms, - true, - ) - .await - { - log::warn!( - "Failed to create Bitmap index on object_type for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created Bitmap index '{}' on object_type for __manifest table", - OBJECT_TYPE_INDEX_NAME - ); - } - } + fn object_id_index_stream(object_ids: BTreeMap, u64>) -> SendableRecordBatchStream { + let schema = + Self::value_row_id_schema(Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false)); + let stream_schema = schema.clone(); + let stream = stream::unfold( + (object_ids.into_iter(), false, schema), + |(mut iter, emitted, schema)| async move { + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + for _ in 0..MANIFEST_INDEX_BATCH_SIZE { + let Some((value, row_id)) = iter.next() else { + break; + }; + values.push(value.to_string()); + row_ids.push(row_id); + } + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } - // Create LabelList index on base_objects - if !has_base_objects_index { - log::debug!( - "Creating LabelList index '{}' on base_objects for __manifest table", - BASE_OBJECTS_INDEX_NAME - ); - let params = ScalarIndexParams::default(); - if let Err(e) = dataset - .create_index( - &["base_objects"], - IndexType::LabelList, - Some(BASE_OBJECTS_INDEX_NAME.to_string()), - ¶ms, - true, + fn object_type_index_stream( + object_types: BTreeMap<&'static str, RoaringBitmap>, + ) -> SendableRecordBatchStream { + let schema = + Self::value_row_id_schema(Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false)); + let stream_schema = schema.clone(); + let entries = object_types + .into_iter() + .map(|(value, bitmap)| { + ( + value, + Box::new(bitmap.into_iter()) as Box + Send>, ) - .await - { - log::warn!( - "Failed to create LabelList index on base_objects for __manifest table: {:?}. Query performance may be impacted.", - e - ); - } else { - log::info!( - "Created LabelList index '{}' on base_objects for __manifest table", - BASE_OBJECTS_INDEX_NAME - ); - } - } + }) + .collect::>() + .into_iter(); + let stream = stream::unfold( + (entries, None, false, schema), + |(mut entries, mut current, emitted, schema)| async move { + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + while values.len() < MANIFEST_INDEX_BATCH_SIZE { + if current.is_none() { + current = entries.next(); + } + let Some((value, iter)) = current.as_mut() else { + break; + }; + if let Some(row_id) = iter.next() { + values.push((*value).to_string()); + row_ids.push(u64::from(row_id)); + } else { + current = None; + } + } - let should_compact_and_optimize = - dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD; + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (entries, current, true, schema))) + } + } else { + let batch = Self::string_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (entries, current, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } - if !should_compact_and_optimize { - return Ok(()); + fn base_objects_index_stream( + base_objects_values: Vec>>, + base_objects_row_ids: Vec, + ) -> SendableRecordBatchStream { + let schema = Self::value_row_id_schema(Field::new( + VALUE_COLUMN_NAME, + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + )); + let stream_schema = schema.clone(); + let stream = stream::unfold( + ( + base_objects_values.into_iter().zip(base_objects_row_ids), + false, + schema, + ), + |(mut iter, emitted, schema)| async move { + let mut values = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + let mut row_ids = Vec::with_capacity(MANIFEST_INDEX_BATCH_SIZE); + for _ in 0..MANIFEST_INDEX_BATCH_SIZE { + let Some((value, row_id)) = iter.next() else { + break; + }; + values.push(value); + row_ids.push(row_id); + } + if values.is_empty() { + if emitted { + None + } else { + let batch = Self::list_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + } else { + let batch = Self::list_row_id_batch(schema.clone(), values, row_ids) + .map_err(|err| DataFusionError::External(Box::new(err))); + Some((batch, (iter, true, schema))) + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + stream_schema, + stream.fuse(), + )) + } + + async fn train_manifest_index( + dataset: &Dataset, + registry: Arc, + input: ManifestIndexBuildInput, + index_uuid: Uuid, + ) -> Result { + let index_store = LanceIndexStore::from_dataset_for_new(dataset, &index_uuid)?; + let plugin = registry.get_plugin_by_name(&input.params.index_type)?; + let training_request = plugin + .new_training_request(input.params.params.as_deref().unwrap_or("{}"), &input.field)?; + let created_index = plugin + .train_index( + input.stream, + &index_store, + training_request, + None, + noop_progress(), + ) + .await?; + Ok(ManifestTrainedIndex { + index_name: input.index_name, + column_name: input.column_name, + uuid: index_uuid, + created_index, + }) + } + + fn manifest_index_metadata( + lance_schema: &lance_core::datatypes::Schema, + fragment_bitmap: &RoaringBitmap, + dataset_version: u64, + trained_index: ManifestTrainedIndex, + ) -> Result { + Ok(IndexMetadata { + uuid: trained_index.uuid, + fields: vec![lance_schema.field_id(trained_index.column_name)?], + name: trained_index.index_name.to_string(), + dataset_version, + fragment_bitmap: Some(fragment_bitmap.clone()), + index_details: Some(Arc::new(trained_index.created_index.index_details)), + index_version: trained_index.created_index.index_version as i32, + created_at: None, + base_id: None, + files: Some(trained_index.created_index.files), + }) + } + + fn manifest_fragment_bitmap(manifest: &Manifest) -> Result { + let mut bitmap = RoaringBitmap::new(); + for fragment in manifest.fragments.iter() { + let fragment_id = u32::try_from(fragment.id).map_err(|_| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Manifest fragment id {} exceeds u32", fragment.id), + }) + })?; + bitmap.insert(fragment_id); } + Ok(bitmap) + } - // Step 2: Run file compaction - log::debug!("Running file compaction on __manifest table"); - match compact_files(dataset, CompactionOptions::default(), None).await { - Ok(compaction_metrics) => { - if compaction_metrics.fragments_removed > 0 { - log::info!( - "Compacted __manifest table: removed {} fragments, added {} fragments", - compaction_metrics.fragments_removed, - compaction_metrics.fragments_added - ); + fn manifest_from_overwrite_transaction( + previous: &Manifest, + schema: lance_core::datatypes::Schema, + fragments: &[Fragment], + ) -> Manifest { + let mut next_fragment_id = 0; + let mut fragments = fragments + .iter() + .cloned() + .map(|mut fragment| { + if fragment.id == 0 { + fragment.id = next_fragment_id; + next_fragment_id += 1; } - } - Err(e) => { - log::warn!( - "Failed to compact files for __manifest table: {:?}. Continuing with optimization.", - e - ); - } - } + fragment + }) + .collect::>(); + fragments.sort_by_key(|fragment| fragment.id); + Manifest::new_from_previous(previous, schema, Arc::new(fragments)) + } - // Step 3: Optimize indices - log::debug!("Optimizing indices on __manifest table"); - match dataset.optimize_indices(&OptimizeOptions::default()).await { - Ok(_) => { - log::info!("Successfully optimized indices on __manifest table"); - } - Err(e) => { - log::warn!( - "Failed to optimize indices on __manifest table: {:?}. Continuing anyway.", - e - ); - } - } + async fn build_manifest_indices( + dataset: &Dataset, + manifest: &Manifest, + index_data: ManifestIndexAccumulator, + index_uuids: [Uuid; 3], + ) -> Result> { + let fragment_bitmap = Self::manifest_fragment_bitmap(manifest)?; + let schema = &manifest.schema; + let ManifestIndexAccumulator { + object_ids, + object_types, + base_objects_values, + base_objects_row_ids, + .. + } = index_data; + let [object_id_uuid, object_type_uuid, base_objects_uuid] = index_uuids; + let registry = IndexPluginRegistry::with_default_plugins(); + + let dataset_version = manifest.version; + let object_id_index_fut = Self::build_manifest_index( + dataset, + registry.clone(), + schema, + ManifestIndexBuildInput { + index_name: OBJECT_ID_INDEX_NAME, + column_name: "object_id", + params: ScalarIndexParams::for_builtin(BuiltinIndexType::BTree), + field: Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + stream: Self::object_id_index_stream(object_ids), + }, + &fragment_bitmap, + dataset_version, + object_id_uuid, + ); + let object_type_index_fut = Self::build_manifest_index( + dataset, + registry.clone(), + schema, + ManifestIndexBuildInput { + index_name: OBJECT_TYPE_INDEX_NAME, + column_name: "object_type", + params: ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap), + field: Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + stream: Self::object_type_index_stream(object_types), + }, + &fragment_bitmap, + dataset_version, + object_type_uuid, + ); + let base_objects_index_fut = Self::build_manifest_index( + dataset, + registry, + schema, + ManifestIndexBuildInput { + index_name: BASE_OBJECTS_INDEX_NAME, + column_name: "base_objects", + params: ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList), + field: Field::new( + VALUE_COLUMN_NAME, + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + stream: Self::base_objects_index_stream(base_objects_values, base_objects_row_ids), + }, + &fragment_bitmap, + dataset_version, + base_objects_uuid, + ); - Ok(()) + let (object_id_index, object_type_index, base_objects_index) = futures::join!( + object_id_index_fut, + object_type_index_fut, + base_objects_index_fut + ); + + Ok(vec![ + object_id_index?, + object_type_index?, + base_objects_index?, + ]) + } + + async fn build_manifest_index( + dataset: &Dataset, + registry: Arc, + lance_schema: &lance_core::datatypes::Schema, + input: ManifestIndexBuildInput, + fragment_bitmap: &RoaringBitmap, + dataset_version: u64, + index_uuid: Uuid, + ) -> Result { + let trained_index = + Self::train_manifest_index(dataset, registry, input, index_uuid).await?; + Self::manifest_index_metadata( + lance_schema, + fragment_bitmap, + dataset_version, + trained_index, + ) } /// Get the manifest schema @@ -783,6 +1540,594 @@ impl ManifestNamespace { }) } + fn required_string_value<'a>( + array: &'a StringArray, + row: usize, + column_name: &str, + ) -> Result<&'a str> { + if array.is_null(row) { + return Err(NamespaceError::Internal { + message: format!("Manifest column '{}' has null at row {}", column_name, row), + } + .into()); + } + Ok(array.value(row)) + } + + fn optional_string_value(array: &StringArray, row: usize) -> Option { + (!array.is_null(row)).then(|| array.value(row).to_string()) + } + + fn base_objects_column_values(batch: &RecordBatch) -> Result>>> { + let Some(column) = batch.column_by_name("base_objects") else { + return Ok(vec![None; batch.num_rows()]); + }; + let array = column.as_any().downcast_ref::().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: format!( + "Column 'base_objects' is not a list array: {:?}", + column.data_type() + ), + }) + })?; + + let mut values = Vec::with_capacity(batch.num_rows()); + for row in 0..batch.num_rows() { + if array.is_null(row) { + values.push(None); + continue; + } + let row_values = array.value(row); + let row_values = row_values + .as_any() + .downcast_ref::() + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Column 'base_objects' values are not strings".to_string(), + }) + })?; + let mut objects = Vec::with_capacity(row_values.len()); + for value_index in 0..row_values.len() { + if row_values.is_null(value_index) { + return Err(NamespaceError::Internal { + message: format!( + "Manifest column 'base_objects' has null item at row {} item {}", + row, value_index + ), + } + .into()); + } + objects.push(row_values.value(value_index).to_string()); + } + values.push(Some(objects)); + } + Ok(values) + } + + async fn manifest_projected_stream(dataset: &Dataset) -> Result { + let mut scanner = dataset.scan(); + scanner + .project(&[ + "object_id", + "object_type", + "location", + "metadata", + "base_objects", + ]) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to project manifest columns: {:?}", e), + }) + })?; + let stream = scanner.try_into_stream().await.map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to create manifest stream: {:?}", e), + }) + })?; + let schema = stream.schema(); + let stream = stream.map_err(|err| DataFusionError::External(Box::new(err))); + Ok(Box::pin(DatafusionRecordBatchStreamAdapter::new( + schema, + stream.fuse(), + ))) + } + + fn manifest_rewrite_commit_retries(&self) -> u32 { + self.commit_retries + .unwrap_or(DEFAULT_MANIFEST_REWRITE_COMMIT_RETRIES) + } + + fn lock_manifest_rewrite_shared( + shared: &Arc>>, + ) -> Result>> { + shared.lock().map_err(|_| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite state mutex was poisoned".to_string(), + }) + }) + } + + fn set_manifest_rewrite_error( + shared: &Arc>>, + err: LanceError, + ) { + match shared.lock() { + Ok(mut guard) => { + guard.error = Some(err); + } + Err(poisoned) => { + let mut guard = poisoned.into_inner(); + guard.error = Some(err); + } + } + } + + fn take_manifest_rewrite_error( + shared: &Arc>>, + ) -> Result> { + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + Ok(guard.error.take()) + } + + fn process_manifest_rewrite_batch( + batch: RecordBatch, + shared: &Arc>>, + ) -> Result> { + let object_ids = Self::get_string_column(&batch, "object_id")?; + let object_types = Self::get_string_column(&batch, "object_type")?; + let locations = Self::get_string_column(&batch, "location")?; + let metadatas = Self::get_string_column(&batch, "metadata")?; + let base_objects = Self::base_objects_column_values(&batch)?; + let mut output = ManifestBatchBuilder::new(); + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let mut index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), + }) + })?; + for (row, base_objects) in base_objects.into_iter().enumerate().take(batch.num_rows()) { + let row_value = ManifestRowValue { + object_id: Self::required_string_value(object_ids, row, "object_id")?.to_string(), + object_type: ObjectType::parse(Self::required_string_value( + object_types, + row, + "object_type", + )?)?, + location: Self::optional_string_value(locations, row), + metadata: Self::optional_string_value(metadatas, row), + base_objects, + }; + guard + .mutation + .process_existing_row(row_value, &mut output, &mut index_data)?; + } + guard.index_data = Some(index_data); + if output.is_empty() { + return Ok(None); + } + Ok(Some(output.finish()?)) + } + + fn finish_manifest_rewrite_stream( + shared: &Arc>>, + ) -> Result> { + let mut output = ManifestBatchBuilder::new(); + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let mut index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), + }) + })?; + guard.mutation.append_rows(&mut output, &mut index_data)?; + let result = guard.mutation.finish(); + let force_empty_batch = index_data.row_count == 0; + guard.result = Some(result); + guard.index_data = Some(index_data); + if output.is_empty() && !force_empty_batch { + Ok(None) + } else { + Ok(Some(output.finish()?)) + } + } + + fn manifest_rewrite_output_stream( + source: SendableRecordBatchStream, + shared: Arc>>, + ) -> SendableRecordBatchStream { + enum Phase { + Source, + Finish, + Done, + } + + let schema = Self::manifest_schema(); + let stream = stream::unfold( + (source, shared, Phase::Source), + |(mut source, shared, mut phase)| async move { + loop { + match phase { + Phase::Source => match source.next().await { + Some(Ok(batch)) => { + match Self::process_manifest_rewrite_batch(batch, &shared) { + Ok(Some(batch)) => { + return Some((Ok(batch), (source, shared, phase))); + } + Ok(None) => continue, + Err(err) => { + let message = err.to_string(); + Self::set_manifest_rewrite_error(&shared, err); + return Some(( + Err(DataFusionError::External(Box::new( + std::io::Error::other(message), + ))), + (source, shared, Phase::Done), + )); + } + } + } + Some(Err(err)) => { + return Some((Err(err), (source, shared, Phase::Done))); + } + None => phase = Phase::Finish, + }, + Phase::Finish => { + phase = Phase::Done; + match Self::finish_manifest_rewrite_stream(&shared) { + Ok(Some(batch)) => { + return Some((Ok(batch), (source, shared, phase))); + } + Ok(None) => continue, + Err(err) => { + let message = err.to_string(); + Self::set_manifest_rewrite_error(&shared, err); + return Some(( + Err(DataFusionError::External(Box::new( + std::io::Error::other(message), + ))), + (source, shared, Phase::Done), + )); + } + } + } + Phase::Done => return None, + } + } + }, + ); + Box::pin(DatafusionRecordBatchStreamAdapter::new( + schema, + stream.fuse(), + )) + } + + fn take_manifest_rewrite_result( + shared: &Arc>>, + ) -> Result<(CopyOnWriteMutation, ManifestIndexAccumulator)> { + let mut guard = Self::lock_manifest_rewrite_shared(shared)?; + let result = guard.result.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite stream did not finish".to_string(), + }) + })?; + let index_data = guard.index_data.take().ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: "Manifest rewrite index state is unavailable".to_string(), + }) + })?; + Ok((result, index_data)) + } + + /// Delete the staged (uncommitted) data files and index directories for a rewrite. + /// Only call this once the rewrite is known *not* to have landed (a put-if-not-exists + /// conflict, or an ambiguous error whose target version does not reference our data + /// file) — otherwise it would orphan files a committed manifest still references. + async fn cleanup_staged_manifest_files( + &self, + data_files: &HashSet, + index_uuids: &[Uuid], + ) { + let data_dir = self + .base_path + .clone() + .join(MANIFEST_TABLE_NAME) + .join(LANCE_DATA_DIR); + for path in data_files { + let data_path = data_dir.clone().join(path.as_str()); + if let Err(err) = self.object_store.delete(&data_path).await { + log::warn!( + "Failed to clean up uncommitted manifest rewrite data file '{}': {}", + data_path, + err + ); + } + } + self.cleanup_uncommitted_manifest_index_dirs(index_uuids.iter().copied()) + .await; + } + + async fn cleanup_uncommitted_manifest_index_dirs( + &self, + index_uuids: impl IntoIterator, + ) { + for index_uuid in index_uuids { + let index_dir = self + .base_path + .clone() + .join(MANIFEST_TABLE_NAME) + .join(LANCE_INDICES_DIR) + .join(index_uuid.to_string()); + if let Err(err) = self.object_store.remove_dir_all(index_dir.clone()).await + && !matches!(err, LanceError::NotFound { .. }) + { + log::warn!( + "Failed to clean up uncommitted manifest rewrite index directory '{}': {}", + index_dir, + err + ); + } + } + } + + /// Resolve the commit handler for the `__manifest` dataset's storage backend. + async fn manifest_commit_handler(&self) -> Result> { + commit_handler_from_url(&self.root, &None) + .await + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to resolve manifest commit handler: {:?}", e), + }) + }) + } + + /// Directly write the rewritten `__manifest` as a new version using the storage + /// backend's atomic put-if-not-exists. The overwrite transaction is embedded inline + /// (no separate transaction file) and the commit handler writes the version hint. + async fn commit_manifest_overwrite( + &self, + dataset: &Dataset, + commit_handler: &dyn CommitHandler, + manifest: &mut Manifest, + indices: Option>, + transaction: Transaction, + ) -> std::result::Result<(), CommitError> { + apply_feature_flags(manifest, false, false).map_err(CommitError::from)?; + let timestamp_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0); + manifest.set_timestamp(timestamp_nanos); + manifest.update_max_fragment_id(); + + let base_path = self.base_path.clone().join(MANIFEST_TABLE_NAME); + let naming_scheme = dataset.manifest_location().naming_scheme; + commit_handler + .commit( + manifest, + indices, + &base_path, + &self.object_store, + write_manifest_file_to_path, + naming_scheme, + Some((&transaction).into()), + ) + .await + .map(|_location| ()) + } + + /// After an ambiguous commit error, determine whether our overwrite actually landed at + /// `target_version`. A network failure can leave the manifest committed even though the + /// client observed an error; in that case the committed version references one of our + /// staged data files, and deleting them would corrupt the catalog. + async fn manifest_commit_landed( + &self, + dataset: &Dataset, + target_version: u64, + data_files: &HashSet, + ) -> bool { + let Ok(committed) = dataset.checkout_version(target_version).await else { + return false; + }; + committed.manifest().fragments.iter().any(|fragment| { + fragment + .files + .iter() + .any(|file| data_files.contains(file.path.as_str())) + }) + } + + /// Resolve a storage commit conflict against the latest committed catalog state. + /// Returns `Some(output)` when the mutation's intent is already satisfied (no retry + /// needed), `Ok(None)` to retry the rewrite, or an error for a terminal conflict. + async fn resolve_manifest_conflict( + &self, + resolution: &ConflictResolution, + ) -> Result> { + match resolution { + ConflictResolution::Retry => Ok(None), + ConflictResolution::FailIfExists(object_ids) => { + for object_id in object_ids { + if self.manifest_contains_object(object_id).await? { + return Err(NamespaceError::ConcurrentModification { + message: format!( + "Object '{}' was concurrently created by another operation", + object_id + ), + } + .into()); + } + } + Ok(None) + } + ConflictResolution::SucceedIfAbsent { object_id, output } => { + if self.manifest_contains_object(object_id).await? { + Ok(None) + } else { + Ok(Some(output.clone())) + } + } + } + } + + async fn rewrite_manifest( + &self, + operation: &str, + mut make_mutation: F, + ) -> Result + where + M: ManifestStreamMutation + 'static, + F: FnMut() -> M, + { + let _mutation_guard = self.manifest_mutation_lock.lock().await; + let max_retries = self.manifest_rewrite_commit_retries(); + let mut retries = 0; + let build_indices = self.inline_optimization_enabled; + let commit_handler = self.manifest_commit_handler().await?; + + loop { + let dataset_guard = self.manifest_dataset.get_refreshed().await?; + let dataset = Arc::new(dataset_guard.clone()); + drop(dataset_guard); + + let source = Self::manifest_projected_stream(&dataset).await?; + let resolution = make_mutation().conflict_resolution(); + let shared = Arc::new(StdMutex::new(ManifestRewriteShared::new(make_mutation()))); + let output_stream = Self::manifest_rewrite_output_stream(source, shared.clone()); + // Pin both limits so the overwrite never splits into multiple fragments: the + // replacement indices map each row to address `(0 << 32) | offset`, valid only + // for a single fragment with id 0. The row count is bounded below u32::MAX by + // `ManifestIndexAccumulator::next_row_id`. + let write_params = WriteParams { + mode: WriteMode::Overwrite, + session: self.session.clone(), + max_rows_per_file: u32::MAX as usize, + max_bytes_per_file: usize::MAX, + skip_auto_cleanup: true, + ..WriteParams::default() + }; + + let transaction = match InsertBuilder::new(dataset.clone()) + .with_params(&write_params) + .execute_uncommitted_stream(output_stream) + .await + { + Ok(transaction) => transaction, + Err(err) => { + if let Some(stream_err) = Self::take_manifest_rewrite_error(&shared)? { + return Err(stream_err); + } + return Err(convert_lance_commit_error(&err, operation, None)); + } + }; + + let (mutation, index_data) = Self::take_manifest_rewrite_result(&shared)?; + + let Operation::Overwrite { + fragments, schema, .. + } = &transaction.operation + else { + return Err(NamespaceError::Internal { + message: "Manifest rewrite transaction is not an overwrite".to_string(), + } + .into()); + }; + // Unique data files this attempt staged. Used to clean up orphans and to + // attribute an ambiguous commit error back to us. + let staged_data_files = fragments + .iter() + .flat_map(|fragment| fragment.files.iter()) + .filter(|file| file.base_id.is_none()) + .map(|file| file.path.clone()) + .collect::>(); + + if !mutation.has_changes { + self.cleanup_staged_manifest_files(&staged_data_files, &[]) + .await; + return Ok(mutation.result); + } + + let mut manifest = Self::manifest_from_overwrite_transaction( + dataset.manifest(), + schema.clone(), + fragments, + ); + let target_version = manifest.version; + + let index_uuids = [Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()]; + let indices = if build_indices { + match Self::build_manifest_indices(&dataset, &manifest, index_data, index_uuids) + .await + { + Ok(indices) => Some(indices), + Err(err) => { + self.cleanup_staged_manifest_files(&staged_data_files, &index_uuids) + .await; + return Err(err); + } + } + } else { + None + }; + let staged_index_uuids: &[Uuid] = if build_indices { &index_uuids } else { &[] }; + + let commit_result = self + .commit_manifest_overwrite( + &dataset, + commit_handler.as_ref(), + &mut manifest, + indices, + transaction, + ) + .await; + + match commit_result { + Ok(()) => { + let _ = self.manifest_dataset.get_refreshed().await; + return Ok(mutation.result); + } + Err(err) => { + // The put may have landed even though the client saw an error (lost + // ack). Verify before deleting anything so we never orphan files that a + // committed manifest still references. + if self + .manifest_commit_landed(&dataset, target_version, &staged_data_files) + .await + { + let _ = self.manifest_dataset.get_refreshed().await; + return Ok(mutation.result); + } + self.cleanup_staged_manifest_files(&staged_data_files, staged_index_uuids) + .await; + match err { + CommitError::CommitConflict => { + if let Some(output) = + self.resolve_manifest_conflict(&resolution).await? + { + return Ok(output); + } + if retries >= max_retries { + return Err(NamespaceError::ConcurrentModification { + message: format!( + "{}: still conflicting after {} retries", + operation, max_retries + ), + } + .into()); + } + retries += 1; + tokio::time::sleep(std::time::Duration::from_millis( + 10 * u64::from(retries), + )) + .await; + } + CommitError::OtherError(err) => { + return Err(convert_lance_commit_error(&err, operation, None)); + } + } + } + } + } + } + /// Check if the manifest contains an object with the given ID async fn manifest_contains_object(&self, object_id: &str) -> Result { let escaped_id = object_id.replace('\'', "''"); @@ -999,7 +2344,6 @@ impl ManifestNamespace { /// Insert one or more entries into the manifest table with metadata and base_objects. /// /// This is the unified entry point for both single and batch inserts. - /// Uses a single MergeInsert operation to insert all entries at once. /// If any entry already exists (matching object_id), the entire batch fails. pub async fn insert_into_manifest_with_metadata( &self, @@ -1029,158 +2373,20 @@ impl ManifestNamespace { return Ok(()); } - let schema = Self::manifest_schema(); - - let mut object_ids = Vec::with_capacity(entries.len()); - let mut object_types = Vec::with_capacity(entries.len()); - let mut locations: Vec> = Vec::with_capacity(entries.len()); - let mut metadatas: Vec> = Vec::with_capacity(entries.len()); - - let string_builder = StringBuilder::new(); - let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new( - "object_id", - DataType::Utf8, - true, - ))); - - for (i, entry) in entries.iter().enumerate() { - object_ids.push(entry.object_id.as_str()); - object_types.push(entry.object_type.as_str()); - locations.push(entry.location.clone()); - metadatas.push(entry.metadata.clone()); - - // Only the first entry gets the base_objects (for single-entry inserts - // with base_objects like view creation); batch entries use null. - if i == 0 { - match &base_objects { - Some(objects) => { - for obj in objects { - list_builder.values().append_value(obj); - } - list_builder.append(true); - } - None => { - list_builder.append_null(); - } - } - } else { - list_builder.append_null(); - } - } - - let base_objects_array = list_builder.finish(); - - let location_array: Arc = Arc::new(StringArray::from( - locations.iter().map(|l| l.as_deref()).collect::>(), - )); - - let metadata_array: Arc = Arc::new(StringArray::from( - metadatas.iter().map(|m| m.as_deref()).collect::>(), - )); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(StringArray::from(object_ids)), - Arc::new(StringArray::from(object_types.to_vec())), - location_array, - metadata_array, - Arc::new(base_objects_array), - ], - ) - .map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to create manifest entries: {:?}", e), - }) - })?; - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - - // Use MergeInsert so callers can choose fail-on-existing inserts or metadata upserts. - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset_arc = Arc::new(dataset_guard.clone()); - drop(dataset_guard); // Drop read guard before merge insert - - let mut merge_builder = - MergeInsertBuilder::try_new(dataset_arc, vec!["object_id".to_string()]).map_err( - |e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to create merge builder: {:?}", e), - }) - }, - )?; - merge_builder.when_matched(when_matched); - merge_builder.when_not_matched(WhenNotMatched::InsertAll); - // Use conflict_retries to handle cross-process races on manifest mutations. - merge_builder.conflict_retries(5); - // TODO: after BTREE index creation on object_id, has_scalar_index=true causes - // MergeInsert to use V1 path which lacks bloom filters for conflict detection. This - // results in (Some, None) filter mismatch when rebasing against V2 operations. - // Setting use_index=false ensures all operations consistently use V2 path. - merge_builder.use_index(false); - if let Some(retries) = self.commit_retries { - merge_builder.commit_retries(retries); - } - - let (new_dataset_arc, _merge_stats) = merge_builder - .try_build() - .map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to build merge: {:?}", e), - }) - })? - .execute_reader(Box::new(reader)) - .await - .map_err(|e| { - convert_lance_commit_error(&e, "Failed to execute merge insert into manifest", None) - })?; - - let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone()); - self.manifest_dataset.set_latest(new_dataset).await; - - // Run inline optimization after write - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(()) + self.rewrite_manifest("Failed to overwrite manifest", || { + UpsertManifestMutation::new(entries.clone(), base_objects.clone(), when_matched.clone()) + }) + .await } /// Delete an entry from the manifest table pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> { - let predicate = format!("object_id = '{}'", object_id); - - // Get dataset and use DeleteBuilder with configured retries - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); // Drop read guard before delete - - let new_dataset = DeleteBuilder::new(dataset, &predicate) - .execute() - .await - .map_err(|e| convert_lance_commit_error(&e, "Failed to delete", None))?; - - // Update the wrapper with the new dataset - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; - - // Run inline optimization after delete - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(()) + let object_id = object_id.to_string(); + self.rewrite_manifest("Failed to delete from manifest", || DeleteObjectMutation { + object_id: object_id.clone(), + deleted: false, + }) + .await } /// Query the manifest for all versions of a table, sorted by version. @@ -1302,90 +2508,55 @@ impl ManifestNamespace { /// `object_type = 'table_version'` entries whose object_id matches /// `{object_id}${zero_padded_version}`. /// - /// Builds a single filter expression covering all version ranges and executes - /// one bulk delete operation instead of deleting versions one at a time. + /// Applies the ranges while streaming the manifest rewrite, without expanding + /// sparse ranges into every possible version object id. pub async fn delete_table_versions( &self, object_id: &str, ranges: &[(i64, i64)], ) -> Result { - if ranges.is_empty() { - return Ok(0); - } - - // Collect all object_ids to delete (both new zero-padded and legacy formats) - let mut object_id_conditions: Vec = Vec::new(); - for (start, end) in ranges { - for version in *start..=*end { - let oid = Self::build_version_object_id(object_id, version); - let escaped = oid.replace('\'', "''"); - object_id_conditions.push(format!("'{}'", escaped)); - } - } - - if object_id_conditions.is_empty() { - return Ok(0); - } - - // First, count how many entries exist so we can report the deleted count - let in_list = object_id_conditions.join(", "); - let filter = format!( - "object_type = 'table_version' AND object_id IN ({})", - in_list - ); + self.batch_delete_table_versions_by_ranges(&[(object_id.to_string(), ranges.to_vec())]) + .await + } - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["object_id", "location"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), + /// Atomically delete table version entries from the manifest for multiple + /// tables and version ranges. + pub async fn batch_delete_table_versions_by_ranges( + &self, + table_ranges: &[(String, Vec<(i64, i64)>)], + ) -> Result { + let targets = table_ranges + .iter() + .filter_map(|(object_id, ranges)| { + let ranges = Self::normalize_table_version_ranges(ranges); + if ranges.is_empty() { + None + } else { + Some(DeleteTableVersionRangeTarget { + object_id_prefix: Self::build_version_object_id_prefix(object_id), + ranges, + }) + } }) - })?; - let batches = Self::execute_scanner(scanner).await?; - let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); - - if deleted_count == 0 { + .collect::>(); + if targets.is_empty() { return Ok(0); } - // Execute a single bulk delete with the combined filter - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); - - let new_dataset = DeleteBuilder::new(dataset, &filter) - .execute() - .await - .map_err(|e| { - convert_lance_commit_error(&e, "Failed to batch delete table versions", None) - })?; - - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; - - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(deleted_count) + self.rewrite_manifest("Failed to delete table versions from manifest", || { + DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::Ranges(targets.clone()), + deleted_count: 0, + } + }) + .await } /// Atomically delete table version entries from the manifest by their object_ids. /// /// This method supports multi-table transactional deletion: all specified /// object_ids (which may span multiple tables) are deleted in a single atomic - /// `DeleteBuilder` operation. Either all entries are removed or none are. + /// copy-on-write manifest rewrite. Either all entries are removed or none are. /// /// Object IDs are formatted as `{table_id}${version}`. pub async fn batch_delete_table_versions_by_object_ids( @@ -1396,70 +2567,14 @@ impl ManifestNamespace { return Ok(0); } - let in_list: String = object_ids - .iter() - .map(|oid| { - let escaped = oid.replace('\'', "''"); - format!("'{}'", escaped) - }) - .collect::>() - .join(", "); - - let filter = format!( - "object_type = 'table_version' AND object_id IN ({})", - in_list - ); - - // Count how many entries exist so we can report the deleted count - let mut scanner = self.manifest_scanner().await?; - scanner.filter(&filter).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to filter: {:?}", e), - }) - })?; - scanner.project(&["object_id", "location"]).map_err(|e| { - lance_core::Error::from(NamespaceError::Internal { - message: format!("Failed to project: {:?}", e), - }) - })?; - let batches = Self::execute_scanner(scanner).await?; - let deleted_count: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); - - if deleted_count == 0 { - return Ok(0); - } - - // Execute a single atomic bulk delete covering all tables - let _mutation_guard = self.manifest_mutation_lock.lock().await; - let dataset_guard = self.manifest_dataset.get().await?; - let dataset = Arc::new(dataset_guard.clone()); - drop(dataset_guard); - - let new_dataset = DeleteBuilder::new(dataset, &filter) - .execute() - .await - .map_err(|e| { - convert_lance_commit_error( - &e, - "Failed to batch delete table versions across multiple tables", - None, - ) - })?; - - self.manifest_dataset - .set_latest( - Arc::try_unwrap(new_dataset.new_dataset).unwrap_or_else(|arc| (*arc).clone()), - ) - .await; - - if let Err(e) = self.run_inline_optimization().await { - log::warn!( - "Unexpected failure when running inline optimization: {:?}", - e - ); - } - - Ok(deleted_count) + let object_ids = object_ids.iter().cloned().collect::>(); + self.rewrite_manifest("Failed to delete table versions from manifest", || { + DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::ObjectIds(object_ids.clone()), + deleted_count: 0, + } + }) + .await } /// Set a property flag in the __manifest table's metadata key-value map. @@ -2866,15 +3981,230 @@ impl LanceNamespace for ManifestNamespace { #[cfg(test)] mod tests { - use crate::{DirectoryNamespaceBuilder, ManifestNamespace}; + use super::{ + BASE_OBJECTS_INDEX_NAME, ConflictResolution, CopyOnWriteMutation, DeleteObjectMutation, + LANCE_DATA_DIR, LANCE_INDICES_DIR, MANIFEST_TABLE_NAME, ManifestBatchBuilder, + ManifestEntry, ManifestIndexAccumulator, ManifestNamespace, ManifestOutputRow, + ManifestRowValue, ManifestStreamMutation, OBJECT_ID_INDEX_NAME, OBJECT_TYPE_INDEX_NAME, + ObjectType, + }; + use crate::DirectoryNamespaceBuilder; + use arrow::datatypes::DataType; use bytes::Bytes; + use futures::StreamExt; + use lance::index::DatasetIndexExt; use lance_core::utils::tempfile::TempStdDir; + use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_namespace::LanceNamespace; use lance_namespace::models::{ CreateNamespaceRequest, CreateTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest, TableExistsRequest, }; + use lance_table::format::Fragment; use rstest::rstest; + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + + async fn create_manifest_namespace( + root: &str, + inline_optimization_enabled: bool, + ) -> ManifestNamespace { + create_manifest_namespace_with_retries(root, inline_optimization_enabled, None).await + } + + async fn create_manifest_namespace_with_retries( + root: &str, + inline_optimization_enabled: bool, + commit_retries: Option, + ) -> ManifestNamespace { + let (object_store, base_path) = ObjectStore::from_uri_and_params( + Arc::new(ObjectStoreRegistry::default()), + root, + &ObjectStoreParams::default(), + ) + .await + .unwrap(); + ManifestNamespace::from_directory( + root.to_string(), + None, + None, + object_store, + base_path, + true, + inline_optimization_enabled, + commit_retries, + false, + ) + .await + .unwrap() + } + + struct CommitConflictAfterRewriteMutation { + root: String, + conflict_object_id: String, + } + + impl ManifestStreamMutation for CommitConflictAfterRewriteMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> lance_core::Result<()> { + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + base_objects: row.base_objects.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> lance_core::Result<()> { + output.append( + index_data, + ManifestOutputRow { + object_id: "attempted_table", + object_type: ObjectType::Table, + location: Some("attempted_table.lance"), + metadata: None, + base_objects: None, + }, + ) + } + + fn finish(&self) -> CopyOnWriteMutation { + let root = self.root.clone(); + let object_id = self.conflict_object_id.clone(); + std::thread::spawn(move || { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async move { + let writer = create_manifest_namespace(&root, false).await; + writer + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id, + object_type: ObjectType::Table, + location: Some("conflicting_table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + }); + }) + .join() + .unwrap(); + CopyOnWriteMutation::updated(()) + } + } + + /// A delete mutation that, during staging, has a concurrent writer delete the same + /// object and commit first, so our own commit hits a conflict while the object is + /// already gone — exercising `ConflictResolution::SucceedIfAbsent`. + struct ConcurrentDeleteBeforeCommitMutation { + inner: DeleteObjectMutation, + root: String, + target: String, + } + + impl ManifestStreamMutation for ConcurrentDeleteBeforeCommitMutation { + type Output = (); + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> lance_core::Result<()> { + self.inner.process_existing_row(row, output, index_data) + } + + fn append_rows( + &mut self, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> lance_core::Result<()> { + self.inner.append_rows(output, index_data) + } + + fn finish(&self) -> CopyOnWriteMutation { + let root = self.root.clone(); + let target = self.target.clone(); + std::thread::spawn(move || { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async move { + let writer = create_manifest_namespace(&root, false).await; + writer.delete_from_manifest(&target).await.unwrap(); + }); + }) + .join() + .unwrap(); + self.inner.finish() + } + + fn conflict_resolution(&self) -> ConflictResolution { + ConflictResolution::SucceedIfAbsent { + object_id: self.target.clone(), + output: (), + } + } + } + + async fn manifest_base_objects( + manifest_ns: &ManifestNamespace, + ) -> HashMap>> { + let mut scanner = manifest_ns.manifest_scanner().await.unwrap(); + scanner.project(&["object_id", "base_objects"]).unwrap(); + let batches = ManifestNamespace::execute_scanner(scanner).await.unwrap(); + let mut rows = HashMap::new(); + for batch in batches { + let object_ids = ManifestNamespace::get_string_column(&batch, "object_id").unwrap(); + let base_objects = ManifestNamespace::base_objects_column_values(&batch).unwrap(); + for (row, value) in base_objects.into_iter().enumerate() { + rows.insert(object_ids.value(row).to_string(), value); + } + } + rows + } + + async fn manifest_data_paths(manifest_ns: &ManifestNamespace) -> HashSet { + let data_dir = manifest_ns + .base_path + .clone() + .join(MANIFEST_TABLE_NAME) + .join(LANCE_DATA_DIR); + let mut stream = manifest_ns.object_store.read_dir_all(&data_dir, None); + let mut paths = HashSet::new(); + while let Some(meta) = stream.next().await.transpose().unwrap() { + paths.insert(meta.location.to_string()); + } + paths + } + + async fn manifest_index_paths(manifest_ns: &ManifestNamespace) -> HashSet { + let index_dir = manifest_ns + .base_path + .clone() + .join(MANIFEST_TABLE_NAME) + .join(LANCE_INDICES_DIR); + let mut stream = manifest_ns.object_store.read_dir_all(&index_dir, None); + let mut paths = HashSet::new(); + while let Some(meta) = stream.next().await.transpose().unwrap() { + paths.insert(meta.location.to_string()); + } + paths + } fn create_test_ipc_data() -> Vec { use arrow::array::{Int32Array, StringArray}; @@ -2906,6 +4236,475 @@ mod tests { buffer } + #[tokio::test] + async fn test_manifest_rewrite_preserves_utf8_metadata_and_base_objects() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, true).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "view".to_string(), + object_type: ObjectType::Table, + location: Some("view.lance".to_string()), + metadata: Some(r#"{"kind":"view"}"#.to_string()), + }], + Some(vec!["base_a".to_string(), "base_b".to_string()]), + ) + .await + .unwrap(); + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "other".to_string(), + object_type: ObjectType::Namespace, + location: None, + metadata: Some(r#"{"kind":"namespace"}"#.to_string()), + }], + None, + ) + .await + .unwrap(); + + let dataset_guard = manifest_ns.manifest_dataset.get().await.unwrap(); + let metadata_field = dataset_guard.schema().field("metadata").unwrap(); + assert_eq!(metadata_field.data_type(), DataType::Utf8); + drop(dataset_guard); + + let base_objects = manifest_base_objects(&manifest_ns).await; + assert_eq!( + base_objects.get("view").cloned().unwrap(), + Some(vec!["base_a".to_string(), "base_b".to_string()]) + ); + assert_eq!(base_objects.get("other").cloned().unwrap(), None); + } + + #[tokio::test] + async fn test_manifest_rewrite_replacement_indices_are_versioned() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, true).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + Some(vec!["base".to_string()]), + ) + .await + .unwrap(); + + let dataset_guard = manifest_ns.manifest_dataset.get().await.unwrap(); + let dataset_version = dataset_guard.version().version; + let indices = dataset_guard.load_indices().await.unwrap(); + let names = indices + .iter() + .map(|index| index.name.as_str()) + .collect::>(); + assert!(names.contains(OBJECT_ID_INDEX_NAME)); + assert!(names.contains(OBJECT_TYPE_INDEX_NAME)); + assert!(names.contains(BASE_OBJECTS_INDEX_NAME)); + for index in indices.iter() { + assert_eq!(index.dataset_version, dataset_version); + assert!(!index.fragment_bitmap.as_ref().unwrap().is_empty()); + } + } + + #[tokio::test] + async fn test_manifest_rewrite_empty_manifest_keeps_replacement_indices_valid() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, true).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + manifest_ns.delete_from_manifest("table").await.unwrap(); + + assert!(!manifest_ns.manifest_contains_object("table").await.unwrap()); + let mut scanner = manifest_ns.manifest_scanner().await.unwrap(); + scanner.project(&["object_id"]).unwrap(); + let rows = ManifestNamespace::execute_scanner(scanner) + .await + .unwrap() + .into_iter() + .map(|batch| batch.num_rows()) + .sum::(); + assert_eq!(rows, 0); + + let dataset_guard = manifest_ns.manifest_dataset.get().await.unwrap(); + let dataset_version = dataset_guard.version().version; + let indices = dataset_guard.load_indices().await.unwrap(); + let names = indices + .iter() + .map(|index| index.name.as_str()) + .collect::>(); + assert!(names.contains(OBJECT_ID_INDEX_NAME)); + assert!(names.contains(OBJECT_TYPE_INDEX_NAME)); + assert!(names.contains(BASE_OBJECTS_INDEX_NAME)); + for index in indices.iter() { + assert_eq!(index.dataset_version, dataset_version); + } + } + + #[tokio::test] + async fn test_manifest_rewrite_fragment_bitmap_uses_overwrite_fragment_ids() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + let dataset_guard = manifest_ns.manifest_dataset.get().await.unwrap(); + let fragments = vec![Fragment::new(0), Fragment::new(0), Fragment::new(7)]; + + let manifest = ManifestNamespace::manifest_from_overwrite_transaction( + dataset_guard.manifest(), + dataset_guard.manifest().schema.clone(), + &fragments, + ); + + let fragment_ids = manifest + .fragments + .iter() + .map(|fragment| fragment.id) + .collect::>(); + assert_eq!(fragment_ids, vec![0, 1, 7]); + assert_eq!( + ManifestNamespace::manifest_fragment_bitmap(&manifest) + .unwrap() + .into_iter() + .collect::>(), + vec![0, 1, 7] + ); + } + + #[tokio::test] + async fn test_manifest_delete_table_versions_by_ranges() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + let table_id = "table"; + let entries = (1..=5) + .map(|version| ManifestEntry { + object_id: ManifestNamespace::build_version_object_id(table_id, version), + object_type: ObjectType::TableVersion, + location: None, + metadata: Some( + serde_json::json!({ + "manifest_path": format!("_versions/{}.manifest", version), + }) + .to_string(), + ), + }) + .collect::>(); + manifest_ns + .insert_into_manifest_with_metadata(entries, None) + .await + .unwrap(); + + let deleted = manifest_ns + .delete_table_versions(table_id, &[(2, 3), (5, 5)]) + .await + .unwrap(); + assert_eq!(deleted, 3); + + let remaining = manifest_ns + .query_table_versions(table_id, false, None) + .await + .unwrap() + .into_iter() + .map(|(version, _)| version) + .collect::>(); + assert_eq!(remaining, vec![1, 4]); + } + + #[tokio::test] + async fn test_manifest_delete_table_versions_by_object_ids() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + let table_id = "table"; + let entries = (1..=3) + .map(|version| ManifestEntry { + object_id: ManifestNamespace::build_version_object_id(table_id, version), + object_type: ObjectType::TableVersion, + location: None, + metadata: Some( + serde_json::json!({ + "manifest_path": format!("_versions/{}.manifest", version), + }) + .to_string(), + ), + }) + .collect::>(); + manifest_ns + .insert_into_manifest_with_metadata(entries, None) + .await + .unwrap(); + + let object_ids = vec![ + ManifestNamespace::build_version_object_id(table_id, 1), + ManifestNamespace::build_version_object_id(table_id, 3), + ]; + let deleted = manifest_ns + .batch_delete_table_versions_by_object_ids(&object_ids) + .await + .unwrap(); + assert_eq!(deleted, 2); + + let remaining = manifest_ns + .query_table_versions(table_id, false, None) + .await + .unwrap() + .into_iter() + .map(|(version, _)| version) + .collect::>(); + assert_eq!(remaining, vec![2]); + } + + #[tokio::test] + async fn test_manifest_noop_delete_uses_latest_snapshot() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let stale_ns = create_manifest_namespace(temp_path, false).await; + let writer_ns = create_manifest_namespace(temp_path, false).await; + + writer_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "late_table".to_string(), + object_type: ObjectType::Table, + location: Some("late_table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + + stale_ns.delete_from_manifest("late_table").await.unwrap(); + + let check_ns = create_manifest_namespace(temp_path, false).await; + assert!( + !check_ns + .manifest_contains_object("late_table") + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_manifest_noop_delete_cleans_uncommitted_data_file() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + + let before = manifest_data_paths(&manifest_ns).await; + assert!(!before.is_empty()); + + manifest_ns + .delete_from_manifest("missing_table") + .await + .unwrap(); + + let after = manifest_data_paths(&manifest_ns).await; + assert_eq!(after, before); + } + + #[tokio::test] + async fn test_manifest_final_commit_failure_cleans_uncommitted_rewrite_files() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace_with_retries(temp_path, true, Some(0)).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + + let before_data_paths = manifest_data_paths(&manifest_ns).await; + let before_index_paths = manifest_index_paths(&manifest_ns).await; + + let result = manifest_ns + .rewrite_manifest("Failed to test manifest cleanup", || { + CommitConflictAfterRewriteMutation { + root: temp_path.to_string(), + conflict_object_id: "conflicting_table".to_string(), + } + }) + .await; + assert!(result.is_err()); + + let after_data_paths = manifest_data_paths(&manifest_ns).await; + assert!(before_data_paths.is_subset(&after_data_paths)); + assert_eq!(after_data_paths.len(), before_data_paths.len() + 1); + assert_eq!(manifest_index_paths(&manifest_ns).await, before_index_paths); + assert!( + manifest_ns + .manifest_contains_object("conflicting_table") + .await + .unwrap() + ); + assert!( + !manifest_ns + .manifest_contains_object("attempted_table") + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_manifest_commit_uses_inline_transaction() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + + let dataset_guard = manifest_ns.manifest_dataset.get().await.unwrap(); + let manifest = dataset_guard.manifest(); + // The overwrite transaction is embedded inline in the manifest, never written as a + // separate _transactions/*.txn file. + assert!(manifest.transaction_section.is_some()); + assert!(manifest.transaction_file.is_none()); + } + + #[tokio::test] + async fn test_manifest_commit_landed_attributes_data_file() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace(temp_path, false).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + + let dataset = Arc::new(manifest_ns.manifest_dataset.get().await.unwrap().clone()); + let version = dataset.manifest().version; + let our_files = dataset + .manifest() + .fragments + .iter() + .flat_map(|fragment| fragment.files.iter()) + .map(|file| file.path.clone()) + .collect::>(); + assert!(!our_files.is_empty()); + + // The committed version references our data file => attributed to us (a lost-ack + // commit must be treated as success, not cleaned up). + assert!( + manifest_ns + .manifest_commit_landed(&dataset, version, &our_files) + .await + ); + // A different file set is not attributed to us. + let other = HashSet::from(["missing.lance".to_string()]); + assert!( + !manifest_ns + .manifest_commit_landed(&dataset, version, &other) + .await + ); + // A version that does not exist did not land. + assert!( + !manifest_ns + .manifest_commit_landed(&dataset, version + 100, &our_files) + .await + ); + } + + #[tokio::test] + async fn test_manifest_delete_conflict_with_concurrent_delete_succeeds() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let manifest_ns = create_manifest_namespace_with_retries(temp_path, false, Some(0)).await; + + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + assert!(manifest_ns.manifest_contains_object("table").await.unwrap()); + + // A concurrent writer deletes "table" and commits first, so our own delete commit + // conflicts while "table" is already gone. Native resolution treats the goal as + // achieved and succeeds instead of erroring or retrying forever. + let result = manifest_ns + .rewrite_manifest("Failed to delete from manifest", || { + ConcurrentDeleteBeforeCommitMutation { + inner: DeleteObjectMutation { + object_id: "table".to_string(), + deleted: false, + }, + root: temp_path.to_string(), + target: "table".to_string(), + } + }) + .await; + + assert!(result.is_ok(), "delete should succeed: {result:?}"); + assert!(!manifest_ns.manifest_contains_object("table").await.unwrap()); + } + #[rstest] #[case::with_optimization(true)] #[case::without_optimization(false)] @@ -3939,9 +5738,9 @@ mod tests { /// Test that concurrent create_table calls for the same table name don't /// create duplicate entries in the manifest. Uses two independent /// ManifestNamespace instances pointing at the same directory to simulate - /// two separate OS processes racing on table creation. The conflict_retries - /// setting on the MergeInsert ensures the second operation properly detects - /// the duplicate via WhenMatched::Fail after retrying against the latest data. + /// two separate OS processes racing on table creation. Copy-on-write rewrite + /// retries ensure the second operation detects the duplicate after retrying + /// against the latest data. #[tokio::test] async fn test_concurrent_create_table_no_duplicates() { let temp_dir = TempStdDir::default(); From 9ff6552ebe959091333ebfdeff8a0947536d24c1 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 10 Jun 2026 23:54:09 -0700 Subject: [PATCH 2/3] test(dir-catalog): add __manifest commit benchmark sweep --- rust/lance-namespace-impls/BENCHMARK.md | 73 ++ rust/lance-namespace-impls/Cargo.toml | 5 + .../benches/manifest_commit_sweep.sh | 146 ++++ .../examples/manifest_bench.rs | 714 ++++++++++++++++++ 4 files changed, 938 insertions(+) create mode 100644 rust/lance-namespace-impls/BENCHMARK.md create mode 100755 rust/lance-namespace-impls/benches/manifest_commit_sweep.sh create mode 100644 rust/lance-namespace-impls/examples/manifest_bench.rs diff --git a/rust/lance-namespace-impls/BENCHMARK.md b/rust/lance-namespace-impls/BENCHMARK.md new file mode 100644 index 00000000000..074ec303347 --- /dev/null +++ b/rust/lance-namespace-impls/BENCHMARK.md @@ -0,0 +1,73 @@ +# `__manifest` commit benchmark + +Measures how fast the copy-on-write directory catalog commits `__manifest` mutations as +the manifest scales, with the inline scalar indices on or off. + +The catalog commits every mutation by rewriting the whole `__manifest` (copy-on-write) +and atomically writing a new manifest version. This benchmark characterises: + +- **Continuous commit** — a single process commits `N` times into a manifest already + holding `rows` entries (per-commit latency + throughput). +- **Concurrent commit** — `C` processes commit continuously for a fixed duration against + a manifest of `rows` entries (steady, contended TPS). + +## Binary: `examples/manifest_bench.rs` + +``` +manifest_bench seed-large --root --count --inline-optimization \ + [--storage-option aws_region=us-east-1] +manifest_bench run --root --operation write-create-namespace \ + --concurrency 1 --operations 100 --initial-entries --inline-optimization # continuous +manifest_bench run --root --operation write-create-namespace \ + --concurrency 50 --duration-secs 30 --initial-entries --inline-optimization # concurrent +``` + +- `seed-large` bootstraps a manifest to `count` rows by writing the Lance dataset + directly (O(rows) once) and then triggering one CoW rewrite so the on-disk state + matches the steady catalog form (single fragment; inline indices when enabled). +- `run` spawns `--concurrency` worker subprocesses. With `--operations` it runs a fixed + commit budget (continuous); with `--duration-secs` each worker commits until the + deadline (steady TPS). It prints one JSON `BenchResult` per concurrency level with + throughput and p50/p90/p99 latency. +- The committed operation (`--operation`) defaults to `write-create-namespace`, the + cheapest pure-`__manifest` mutation (no table data). `write-create-table` / + `write-declare-table` are also available. + +S3 requires the default `dir-aws` feature (on by default) and AWS credentials in the +environment; pass `--storage-option aws_region=`. + +## Sweep panel: `benches/manifest_commit_sweep.sh` + +Runs the full panel — sizes × {inline index, no index} × {continuous, concurrent×C} — +with per-run S3-copy isolation (each run starts at exactly the bootstrapped size), +JSONL results, a `summary.csv`, and resume support. + +```bash +cargo build --release --example manifest_bench -p lance-namespace-impls +S3_BASE=s3:///manifest-cow-bench/$(date -u +%Y%m%dT%H%M%SZ) \ + rust/lance-namespace-impls/benches/manifest_commit_sweep.sh +``` + +Default panel (override via env): `SIZES="1000 2000 5000 10000 20000 50000 100000 200000 +500000 1000000"`, `CONCURRENCY="10 20 50 100 120 150 200"`, `INLINE_VARIANTS="true false"`, +`CONT_OPS=100`, `CONC_DURATION_SECS=30`. Results land in `$OUT_DIR` (default +`~/manifest_cow_bench_`). + +## Representative results + +EC2 `c7i.48xlarge`, S3 `us-east-1`, op `write-create-namespace`. The catalog is a +single-writer-throughput system: per-commit cost scales ~O(rows) and throughput does **not** +scale with concurrency (every commit is a serialized `__manifest` version bump). + +Continuous (1 process, 100 commits), ops/s — inline index vs no index: + +| rows | inline | no index | +|---:|---:|---:| +| 1,000 | 2.0 | 3.5 | +| 100,000 | 1.1 | 2.1 | +| 1,000,000 | 0.34 | 0.53 | + +Concurrent steady TPS is flat across C=10..200 (e.g. inline @100k ≈ 1.4–1.5 ops/s at every C; +@1M ≈ 0.3 ops/s). Conflicts that exceed the retry budget surface as errors and grow with C +(≈0 at C≤20, climbing at C≥100) — the contention ceiling, not data loss. No-index commits run +~1.5–2× faster (no per-commit index build) at the cost of unindexed reads. diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index 56b8a5864ea..c2bf057ee21 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -100,6 +100,11 @@ rstest.workspace = true lance-table.workspace = true lance-arrow = { workspace = true } lance = { workspace = true } +serde = { workspace = true, features = ["derive"] } + +[[example]] +name = "manifest_bench" +path = "examples/manifest_bench.rs" [lints] workspace = true diff --git a/rust/lance-namespace-impls/benches/manifest_commit_sweep.sh b/rust/lance-namespace-impls/benches/manifest_commit_sweep.sh new file mode 100755 index 00000000000..7384ced4152 --- /dev/null +++ b/rust/lance-namespace-impls/benches/manifest_commit_sweep.sh @@ -0,0 +1,146 @@ +#!/usr/bin/env bash +# Copy-on-write __manifest commit benchmark sweep panel. +# +# Drives `cargo run --release --example manifest_bench` across a panel of: +# - bootstrap manifest sizes (rows already in __manifest) +# - inline scalar indices on vs off +# - continuous commit (single process, N commits) and +# concurrent commit (C processes, steady TPS over a fixed duration) +# +# Each run is isolated: a "golden" manifest is bootstrapped once per (size, index) +# and server-side-copied to a fresh S3 prefix per run, so every run starts at exactly +# the bootstrapped size. Results are written as JSONL (one BenchResult per line) and +# summarised to CSV. The sweep is resumable: completed runs are skipped. +# +# Usage: +# S3_BASE=s3://jack-devland-build/manifest-cow-bench/$(date -u +%Y%m%dT%H%M%SZ) \ +# ./manifest_commit_sweep.sh +# +# Env knobs (defaults match the requested panel): +# SIZES, CONCURRENCY, INLINE_VARIANTS, CONT_OPS, CONC_DURATION_SECS, +# AWS_REGION, OUT_DIR, BIN +# +# Resilient by design: a single failed run is logged and skipped rather than aborting +# the sweep, and re-running fills the gaps (completed runs are detected and skipped). +set -uo pipefail + +RUN_ID="${RUN_ID:-$(date -u +%Y%m%dT%H%M%SZ)}" +S3_BASE="${S3_BASE:?set S3_BASE, e.g. s3://jack-devland-build/manifest-cow-bench/$RUN_ID}" +AWS_REGION="${AWS_REGION:-us-east-1}" +export AWS_REGION AWS_DEFAULT_REGION="$AWS_REGION" + +REPO_ROOT="${REPO_ROOT:-$HOME/oss/lance}" +BIN="${BIN:-$REPO_ROOT/target/release/examples/manifest_bench}" +OUT_DIR="${OUT_DIR:-$HOME/manifest_cow_bench_${RUN_ID}}" +RESULTS="$OUT_DIR/results.jsonl" +PROGRESS="$OUT_DIR/progress.log" +mkdir -p "$OUT_DIR" + +SIZES=(${SIZES:-1000 2000 5000 10000 20000 50000 100000 200000 500000 1000000}) +CONCURRENCY=(${CONCURRENCY:-10 20 50 100 120 150 200}) +INLINE_VARIANTS=(${INLINE_VARIANTS:-true false}) +CONT_OPS="${CONT_OPS:-100}" +CONC_DURATION_SECS="${CONC_DURATION_SECS:-30}" +STORAGE_OPT=(--storage-option "aws_region=${AWS_REGION}") + +log() { printf '%s %s\n' "$(date -u +%H:%M:%S)" "$*" | tee -a "$PROGRESS"; } + +# Skip a run if its tag already appears in results.jsonl (resume support). +done_already() { grep -q "\"bench_tag\":\"$1\"" "$RESULTS" 2>/dev/null; } + +# Append a result line, tagging it so reruns can resume and we can pivot later. +record() { + local tag="$1"; shift + # shellcheck disable=SC2016 + python3 -c 'import json,sys; d=json.load(sys.stdin); d["bench_tag"]=sys.argv[1]; print(json.dumps(d))' \ + "$tag" >> "$RESULTS" +} + +s3_copy() { aws s3 cp --recursive --quiet "$1" "$2" --region "$AWS_REGION"; } +s3_rm() { aws s3 rm --recursive --quiet "$1" --region "$AWS_REGION" || true; } + +# Backstops for unattended runs: cap any single run and clear leaked worker processes +# (a killed coordinator can orphan its worker children) before the next run. +RUN_TIMEOUT="${RUN_TIMEOUT:-1200}" +clear_stragglers() { pkill -f 'examples/manifest_bench worker' 2>/dev/null || true; sleep 1; } + +for inline in "${INLINE_VARIANTS[@]}"; do + for rows in "${SIZES[@]}"; do + golden="${S3_BASE}/golden/inline_${inline}_rows_${rows}" + boot_tag="boot_inline_${inline}_rows_${rows}" + + if ! done_already "$boot_tag"; then + log "BOOTSTRAP inline=$inline rows=$rows -> $golden" + s3_rm "$golden" + if "$BIN" seed-large --root "$golden" --count "$rows" \ + --inline-optimization "$inline" "${STORAGE_OPT[@]}"; then + echo "{\"bench_tag\":\"$boot_tag\"}" >> "$RESULTS" + else + log "BOOTSTRAP FAILED inline=$inline rows=$rows (skipping this size)" + continue + fi + else + log "skip bootstrap $boot_tag (done)" + fi + + # ---- Continuous: single process, CONT_OPS commits ---- + cont_tag="cont_inline_${inline}_rows_${rows}" + if ! done_already "$cont_tag"; then + run_prefix="${S3_BASE}/run/${cont_tag}" + log "CONTINUOUS inline=$inline rows=$rows ops=$CONT_OPS" + clear_stragglers + s3_copy "$golden" "$run_prefix" + timeout "$RUN_TIMEOUT" "$BIN" run --root "$run_prefix" --operation write-create-namespace \ + --concurrency 1 --operations "$CONT_OPS" --initial-entries "$rows" \ + --inline-optimization "$inline" "${STORAGE_OPT[@]}" \ + 2>>"$PROGRESS" | while read -r line; do record "$cont_tag" <<<"$line"; done + s3_rm "$run_prefix" + else + log "skip continuous $cont_tag (done)" + fi + + # ---- Concurrent: C processes, steady TPS over CONC_DURATION_SECS ---- + for c in "${CONCURRENCY[@]}"; do + conc_tag="conc_inline_${inline}_rows_${rows}_c_${c}" + if done_already "$conc_tag"; then log "skip concurrent $conc_tag (done)"; continue; fi + run_prefix="${S3_BASE}/run/${conc_tag}" + log "CONCURRENT inline=$inline rows=$rows c=$c dur=${CONC_DURATION_SECS}s" + clear_stragglers + s3_copy "$golden" "$run_prefix" + timeout "$RUN_TIMEOUT" "$BIN" run --root "$run_prefix" --operation write-create-namespace \ + --concurrency "$c" --duration-secs "$CONC_DURATION_SECS" --initial-entries "$rows" \ + --inline-optimization "$inline" "${STORAGE_OPT[@]}" \ + 2>>"$PROGRESS" | while read -r line; do record "$conc_tag" <<<"$line"; done + s3_rm "$run_prefix" + done + done +done + +# ---- Summarise to CSV ---- +CSV="$OUT_DIR/summary.csv" +python3 - "$RESULTS" "$CSV" <<'PY' +import json, sys, csv +rows = [] +with open(sys.argv[1]) as f: + for line in f: + d = json.loads(line) + if "throughput_ops_per_sec" not in d: + continue # bootstrap marker + mode = "continuous" if d["duration_secs"] == 0 else "concurrent" + rows.append({ + "mode": mode, "variant": d["variant"], "initial_entries": d["initial_entries"], + "concurrency": d["concurrency"], "duration_secs": d["duration_secs"], + "ops": d["total_operations"], "errors": d["errors"], + "tps": round(d["throughput_ops_per_sec"], 3), + "avg_ms": round(d["avg_latency_ms"], 2), "p50_ms": round(d["p50_latency_ms"], 2), + "p90_ms": round(d["p90_latency_ms"], 2), "p99_ms": round(d["p99_latency_ms"], 2), + }) +rows.sort(key=lambda r: (r["mode"], r["variant"], r["initial_entries"], r["concurrency"])) +with open(sys.argv[2], "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else []) + w.writeheader(); w.writerows(rows) +print(f"wrote {len(rows)} rows to {sys.argv[2]}") +PY + +log "SWEEP COMPLETE. Results: $RESULTS Summary: $CSV" +s3_rm "${S3_BASE}/golden" "${S3_BASE}/run" 2>/dev/null || true diff --git a/rust/lance-namespace-impls/examples/manifest_bench.rs b/rust/lance-namespace-impls/examples/manifest_bench.rs new file mode 100644 index 00000000000..4841f2471d7 --- /dev/null +++ b/rust/lance-namespace-impls/examples/manifest_bench.rs @@ -0,0 +1,714 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Copy-on-write `__manifest` directory-catalog commit benchmark (S3 capable). +//! +//! Measures how fast the directory catalog commits `__manifest` mutations as the +//! manifest scales, with the inline scalar indices on or off. +//! +//! Modes: +//! seed-large — bootstrap a `__manifest` with N rows (direct dataset write + one +//! CoW rewrite to build indices) +//! run — coordinator: spawn `--concurrency` worker processes committing for +//! either a fixed op count (continuous) or a fixed duration (steady TPS) +//! worker — (internal) a single committing process spawned by `run` +//! +//! Examples: +//! # Bootstrap 100k rows with inline indices +//! manifest_bench seed-large --root s3://bucket/bench/p --count 100000 \ +//! --inline-optimization true --storage-option aws_region=us-east-1 +//! +//! # Continuous: 100 commits, single process +//! manifest_bench run --root s3://bucket/bench/p --operation write-create-namespace \ +//! --concurrency 1 --operations 100 --initial-entries 100000 --inline-optimization true +//! +//! # Concurrent steady TPS: 50 processes committing for 30s +//! manifest_bench run --root s3://bucket/bench/p --operation write-create-namespace \ +//! --concurrency 50 --duration-secs 30 --initial-entries 100000 --inline-optimization true + +// A CLI benchmark tool: workers emit JSON latency records on stdout and progress on +// stderr, so stdout/stderr printing is intentional here. +#![allow(clippy::print_stdout, clippy::print_stderr)] + +use std::collections::HashMap; +use std::io::{BufRead, BufReader}; +use std::process::{Command, Stdio}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use arrow::array::builder::{ListBuilder, StringBuilder}; +use arrow::array::{RecordBatch, RecordBatchIterator, StringArray}; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use bytes::Bytes; +use lance::dataset::{InsertBuilder, WriteMode, WriteParams}; +use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; +use lance_namespace::LanceNamespace; +use lance_namespace::models::{ + CreateNamespaceRequest, CreateTableRequest, DeclareTableRequest, DescribeTableRequest, + ListNamespacesRequest, ListTablesRequest, +}; +use lance_namespace_impls::DirectoryNamespaceBuilder; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +struct LatencyRecord { + operation: String, + latency_ms: f64, + error: bool, +} + +#[derive(Serialize)] +struct BenchResult { + variant: String, + operation: String, + concurrency: usize, + initial_entries: usize, + duration_secs: u64, + total_operations: usize, + total_duration_ms: f64, + throughput_ops_per_sec: f64, + avg_latency_ms: f64, + p50_latency_ms: f64, + p90_latency_ms: f64, + p99_latency_ms: f64, + min_latency_ms: f64, + max_latency_ms: f64, + errors: usize, +} + +fn percentile(sorted: &[f64], p: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +#[allow(clippy::too_many_arguments)] +fn compute_result( + variant: &str, + operation: &str, + concurrency: usize, + initial_entries: usize, + duration_secs: u64, + wall_duration: Duration, + mut latencies: Vec, + errors: usize, +) -> BenchResult { + latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let total = latencies.len(); + let total_ms = wall_duration.as_secs_f64() * 1000.0; + let throughput = if total_ms > 0.0 { + total as f64 / (total_ms / 1000.0) + } else { + 0.0 + }; + BenchResult { + variant: variant.to_string(), + operation: operation.to_string(), + concurrency, + initial_entries, + duration_secs, + total_operations: total, + total_duration_ms: total_ms, + throughput_ops_per_sec: throughput, + avg_latency_ms: if total > 0 { + latencies.iter().sum::() / total as f64 + } else { + 0.0 + }, + p50_latency_ms: percentile(&latencies, 0.50), + p90_latency_ms: percentile(&latencies, 0.90), + p99_latency_ms: percentile(&latencies, 0.99), + min_latency_ms: latencies.first().copied().unwrap_or(0.0), + max_latency_ms: latencies.last().copied().unwrap_or(0.0), + errors, + } +} + +fn create_test_ipc_data() -> Vec { + use arrow::array::Int32Array; + use arrow_ipc::writer::StreamWriter; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let mut buffer = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + buffer +} + +/// The `__manifest` schema used by the copy-on-write directory catalog: +/// `object_id`, `object_type`, `location`, `metadata` (Utf8), `base_objects` (List). +fn manifest_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("object_id", DataType::Utf8, false).with_metadata( + [( + LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(), + "0".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("object_type", DataType::Utf8, false), + Field::new("location", DataType::Utf8, true), + Field::new("metadata", DataType::Utf8, true), + Field::new( + "base_objects", + DataType::List(Arc::new(Field::new("object_id", DataType::Utf8, true))), + true, + ), + ])) +} + +async fn build_namespace( + root: &str, + inline_optimization: bool, + storage_options: &HashMap, +) -> Box { + let mut properties = HashMap::new(); + properties.insert("root".to_string(), root.to_string()); + properties.insert("dir_listing_enabled".to_string(), "false".to_string()); + properties.insert( + "inline_optimization_enabled".to_string(), + inline_optimization.to_string(), + ); + for (k, v) in storage_options { + properties.insert(format!("storage.{}", k), v.clone()); + } + let builder = DirectoryNamespaceBuilder::from_properties(properties, None) + .expect("Failed to create namespace builder from properties"); + Box::new(builder.build().await.expect("Failed to build namespace")) +} + +// ──────────────────── seed-large mode ──────────────────── +// Bootstrap a `__manifest` with N rows by writing the Lance dataset directly (fast, +// O(N) once), then trigger a single CoW rewrite via the namespace so the on-disk state +// matches what the catalog produces (single fragment + inline indices when enabled). + +const SEED_LARGE_BATCH_SIZE: usize = 50_000; + +fn generate_manifest_batch(start_idx: usize, batch_size: usize, total_count: usize) -> RecordBatch { + let ns_count = total_count / 3; + let actual_size = batch_size.min(total_count - start_idx); + + let mut object_ids = Vec::with_capacity(actual_size); + let mut object_types = Vec::with_capacity(actual_size); + let mut locations: Vec> = Vec::with_capacity(actual_size); + let mut metadatas: Vec> = Vec::with_capacity(actual_size); + + for i in start_idx..start_idx + actual_size { + if i < ns_count { + object_ids.push(format!("ns_{}", i)); + object_types.push("namespace".to_string()); + locations.push(None); + metadatas.push(None); + } else { + let table_idx = i - ns_count; + object_ids.push(format!("table_{}", table_idx)); + object_types.push("table".to_string()); + locations.push(Some(format!("table_{}", table_idx))); + metadatas.push(Some(r#"{"bench":"true"}"#.to_string())); + } + } + + // base_objects is null for every bootstrapped row. + let mut base_objects_builder = ListBuilder::new(StringBuilder::new()) + .with_field(Arc::new(Field::new("object_id", DataType::Utf8, true))); + for _ in 0..actual_size { + base_objects_builder.append_null(); + } + + RecordBatch::try_new( + manifest_schema(), + vec![ + Arc::new(StringArray::from(object_ids)), + Arc::new(StringArray::from(object_types)), + Arc::new(StringArray::from( + locations.iter().map(|l| l.as_deref()).collect::>(), + )), + Arc::new(StringArray::from( + metadatas.iter().map(|m| m.as_deref()).collect::>(), + )), + Arc::new(base_objects_builder.finish()), + ], + ) + .expect("Failed to create manifest batch") +} + +async fn seed_large( + root: &str, + count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + let manifest_uri = format!("{}/{}", root, "__manifest"); + eprintln!("Seed-large: writing {} rows to {}", count, manifest_uri); + + let schema = manifest_schema(); + let mut batches = Vec::new(); + let mut offset = 0; + while offset < count { + let batch_size = SEED_LARGE_BATCH_SIZE.min(count - offset); + batches.push(generate_manifest_batch(offset, batch_size, count)); + offset += batch_size; + } + eprintln!(" generated {} batches", batches.len()); + + let mut write_params = WriteParams { + mode: WriteMode::Create, + ..WriteParams::default() + }; + if !storage_options.is_empty() { + let accessor = Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + storage_options.clone(), + ), + ); + write_params.store_params = Some(lance_io::object_store::ObjectStoreParams { + storage_options_accessor: Some(accessor), + ..Default::default() + }); + } + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + InsertBuilder::new(manifest_uri.as_str()) + .with_params(&write_params) + .execute_stream(reader) + .await + .expect("Failed to write manifest dataset"); + eprintln!(" wrote Lance dataset"); + + // Trigger one CoW rewrite so the manifest is in steady catalog form (single + // fragment; inline indices when enabled). For the no-index variant the first real + // commit performs this rewrite instead. + if inline_optimization { + eprintln!(" triggering initial CoW rewrite to build indices..."); + let start = Instant::now(); + let ns = build_namespace(root, true, storage_options).await; + let mut req = CreateNamespaceRequest::new(); + req.id = Some(vec!["__seed_trigger__".to_string()]); + ns.create_namespace(req) + .await + .expect("Failed to trigger CoW rewrite"); + eprintln!( + " CoW rewrite with index build took {:.1}s", + start.elapsed().as_secs_f64() + ); + } + + let ns_count = count / 3; + eprintln!( + "Seed-large complete: {} rows ({} namespaces, {} tables)", + count, + ns_count, + count - ns_count + ); +} + +// ──────────────────── worker mode ──────────────────── + +#[allow(clippy::too_many_arguments)] +async fn worker( + root: &str, + operation: &str, + operations: usize, + duration_secs: u64, + warmup: usize, + worker_id: usize, + table_count: usize, + inline_optimization: bool, + storage_options: &HashMap, +) { + let ns = build_namespace(root, inline_optimization, storage_options).await; + let ipc_data = Bytes::from(create_test_ipc_data()); + + if operation.starts_with("warm-read") { + for _ in 0..warmup { + let _ = + run_operation(ns.as_ref(), operation, worker_id, 0, table_count, &ipc_data).await; + } + } + + let emit = |op_idx: usize, start: Instant, err: bool| { + let record = LatencyRecord { + operation: operation.to_string(), + latency_ms: start.elapsed().as_secs_f64() * 1000.0, + error: err, + }; + let _ = op_idx; + println!("{}", serde_json::to_string(&record).unwrap()); + }; + + if duration_secs > 0 { + // Steady-TPS mode: commit continuously until the deadline. + let deadline = Instant::now() + Duration::from_secs(duration_secs); + let mut op_idx = 0; + while Instant::now() < deadline { + let start = Instant::now(); + let err = run_operation( + ns.as_ref(), + operation, + worker_id, + op_idx, + table_count, + &ipc_data, + ) + .await + .is_err(); + emit(op_idx, start, err); + op_idx += 1; + } + } else { + for op_idx in 0..operations { + let start = Instant::now(); + let err = run_operation( + ns.as_ref(), + operation, + worker_id, + op_idx, + table_count, + &ipc_data, + ) + .await + .is_err(); + emit(op_idx, start, err); + } + } +} + +async fn run_operation( + ns: &dyn LanceNamespace, + operation: &str, + worker_id: usize, + op_idx: usize, + table_count: usize, + ipc_data: &Bytes, +) -> Result<(), Box> { + match operation { + "cold-read-list-namespaces" | "warm-read-list-namespaces" => { + let mut req = ListNamespacesRequest::new(); + req.id = Some(vec![]); + ns.list_namespaces(req).await?; + } + "cold-read-list-tables" | "warm-read-list-tables" => { + let mut req = ListTablesRequest::new(); + req.id = Some(vec![]); + ns.list_tables(req).await?; + } + "cold-read-describe-table" | "warm-read-describe-table" => { + let table_idx = (worker_id * 1_000_000 + op_idx) % table_count.max(1); + let req = DescribeTableRequest { + id: Some(vec![format!("table_{}", table_idx)]), + ..Default::default() + }; + ns.describe_table(req).await?; + } + "write-create-namespace" => { + let mut req = CreateNamespaceRequest::new(); + req.id = Some(vec![format!("bench_w{}_{}", worker_id, op_idx)]); + ns.create_namespace(req).await?; + } + "write-create-table" => { + let mut req = CreateTableRequest::new(); + req.id = Some(vec![format!("bench_t{}_{}", worker_id, op_idx)]); + ns.create_table(req, ipc_data.clone()).await?; + } + "write-declare-table" => { + let req = DeclareTableRequest { + id: Some(vec![format!("bench_d{}_{}", worker_id, op_idx)]), + ..Default::default() + }; + ns.declare_table(req).await?; + } + _ => { + return Err(format!("unknown operation: {}", operation).into()); + } + } + Ok(()) +} + +// ──────────────────── run mode (coordinator) ──────────────────── + +#[allow(clippy::too_many_arguments)] +fn run_workers( + self_exe: &str, + root: &str, + operation: &str, + concurrency: usize, + operations: usize, + duration_secs: u64, + warmup: usize, + table_count: usize, + initial_entries: usize, + inline_optimization: bool, + variant: &str, + storage_options: &HashMap, +) -> BenchResult { + // Continuous mode splits a fixed op budget across workers; steady-TPS mode lets each + // worker run for the full duration. + let ops_per_worker = if duration_secs > 0 { + 0 + } else { + operations / concurrency.max(1) + }; + if duration_secs == 0 && ops_per_worker == 0 { + return compute_result( + variant, + operation, + concurrency, + initial_entries, + duration_secs, + Duration::ZERO, + vec![], + 0, + ); + } + + let wall_start = Instant::now(); + let children: Vec<_> = (0..concurrency) + .map(|worker_id| { + let mut cmd = Command::new(self_exe); + cmd.arg("worker") + .arg("--root") + .arg(root) + .arg("--operation") + .arg(operation) + .arg("--operations") + .arg(ops_per_worker.to_string()) + .arg("--duration-secs") + .arg(duration_secs.to_string()) + .arg("--warmup") + .arg(warmup.to_string()) + .arg("--worker-id") + .arg(worker_id.to_string()) + .arg("--table-count") + .arg(table_count.to_string()) + .arg("--inline-optimization") + .arg(inline_optimization.to_string()); + for (k, v) in storage_options { + cmd.arg("--storage-option").arg(format!("{}={}", k, v)); + } + cmd.stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("Failed to spawn worker") + }) + .collect(); + + let mut all_latencies = Vec::new(); + let mut total_errors = 0; + for mut child in children { + let stdout = child.stdout.take().unwrap(); + for line in BufReader::new(stdout).lines() { + let line = line.expect("failed to read worker output"); + if let Ok(record) = serde_json::from_str::(&line) { + if record.error { + total_errors += 1; + } else { + all_latencies.push(record.latency_ms); + } + } + } + let status = child.wait().expect("failed to wait for worker"); + if !status.success() { + eprintln!("Worker exited with status: {}", status); + } + } + + compute_result( + variant, + operation, + concurrency, + initial_entries, + duration_secs, + wall_start.elapsed(), + all_latencies, + total_errors, + ) +} + +fn parse_concurrency_list(s: &str) -> Vec { + s.split(',') + .filter_map(|v| v.trim().parse::().ok()) + .filter(|v| *v > 0) + .collect() +} + +#[tokio::main] +async fn main() { + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: manifest_bench [options]"); + std::process::exit(1); + } + + let mode = args[1].as_str(); + let mut root = String::new(); + let mut operation = String::new(); + let mut operations: usize = 100; + let mut duration_secs: u64 = 0; + let mut warmup: usize = 0; + let mut concurrency_list = vec![1]; + let mut count: usize = 1000; + let mut worker_id: usize = 0; + let mut table_count: usize = 667; + let mut initial_entries: usize = 0; + let mut inline_optimization = true; + let mut variant = String::new(); + let mut storage_options: HashMap = HashMap::new(); + + let mut i = 2; + while i < args.len() { + match args[i].as_str() { + "--root" => { + root = args[i + 1].clone(); + i += 2; + } + "--operation" => { + operation = args[i + 1].clone(); + i += 2; + } + "--operations" => { + operations = args[i + 1].parse().unwrap(); + i += 2; + } + "--duration-secs" => { + duration_secs = args[i + 1].parse().unwrap(); + i += 2; + } + "--warmup" => { + warmup = args[i + 1].parse().unwrap(); + i += 2; + } + "--concurrency" => { + concurrency_list = parse_concurrency_list(&args[i + 1]); + i += 2; + } + "--count" => { + count = args[i + 1].parse().unwrap(); + i += 2; + } + "--worker-id" => { + worker_id = args[i + 1].parse().unwrap(); + i += 2; + } + "--table-count" => { + table_count = args[i + 1].parse().unwrap(); + i += 2; + } + "--initial-entries" => { + initial_entries = args[i + 1].parse().unwrap(); + i += 2; + } + "--inline-optimization" => { + inline_optimization = args[i + 1].parse().unwrap(); + i += 2; + } + "--variant" => { + variant = args[i + 1].clone(); + i += 2; + } + "--storage-option" => { + if let Some((k, v)) = args[i + 1].split_once('=') { + storage_options.insert(k.to_string(), v.to_string()); + } + i += 2; + } + other => { + eprintln!("Unknown argument: {}", other); + std::process::exit(1); + } + } + } + + if variant.is_empty() { + variant = if inline_optimization { + "inline_index".to_string() + } else { + "no_index".to_string() + }; + } + + match mode { + "seed-large" => { + seed_large(&root, count, inline_optimization, &storage_options).await; + } + "worker" => { + worker( + &root, + &operation, + operations, + duration_secs, + warmup, + worker_id, + table_count, + inline_optimization, + &storage_options, + ) + .await; + } + "run" => { + let self_exe = std::env::current_exe() + .expect("failed to get self exe path") + .to_string_lossy() + .to_string(); + let op = if operation.is_empty() { + "write-create-namespace" + } else { + operation.as_str() + }; + + eprintln!("=== Manifest commit benchmark ==="); + eprintln!( + "variant={} op={} root={} initial_entries={} concurrency={:?} operations={} duration_secs={}", + variant, op, root, initial_entries, concurrency_list, operations, duration_secs + ); + + for &concurrency in &concurrency_list { + let result = run_workers( + &self_exe, + &root, + op, + concurrency, + operations, + duration_secs, + warmup, + table_count, + initial_entries, + inline_optimization, + &variant, + &storage_options, + ); + eprintln!( + " c={} -> {:.2} ops/s ({} ops, {} errors, p50={:.0}ms p99={:.0}ms)", + concurrency, + result.throughput_ops_per_sec, + result.total_operations, + result.errors, + result.p50_latency_ms, + result.p99_latency_ms + ); + println!("{}", serde_json::to_string(&result).unwrap()); + } + eprintln!("=== complete ==="); + } + _ => { + eprintln!("Unknown mode: {}. Use seed-large, run, or worker.", mode); + std::process::exit(1); + } + } +} From 911dac3e08fff2be7287f4626adf601d31c7f4da Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Thu, 11 Jun 2026 09:07:15 -0700 Subject: [PATCH 3/3] fix(dir-catalog): commit __manifest via the dataset's own object store The native commit wrote the manifest through the namespace's own ObjectStore, which for stores like memory:// is a different instance than the one the manifest dataset reads from, so commits were invisible to reads (stale version -> endless conflict / not-found). Route the commit and cleanup through dataset.object_store(). --- .../lance-namespace-impls/src/dir/manifest.rs | 79 ++++++++++++++++--- 1 file changed, 70 insertions(+), 9 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index a86f5209c6e..067239b8765 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -1823,6 +1823,7 @@ impl ManifestNamespace { /// file) — otherwise it would orphan files a committed manifest still references. async fn cleanup_staged_manifest_files( &self, + object_store: &ObjectStore, data_files: &HashSet, index_uuids: &[Uuid], ) { @@ -1833,7 +1834,7 @@ impl ManifestNamespace { .join(LANCE_DATA_DIR); for path in data_files { let data_path = data_dir.clone().join(path.as_str()); - if let Err(err) = self.object_store.delete(&data_path).await { + if let Err(err) = object_store.delete(&data_path).await { log::warn!( "Failed to clean up uncommitted manifest rewrite data file '{}': {}", data_path, @@ -1841,12 +1842,13 @@ impl ManifestNamespace { ); } } - self.cleanup_uncommitted_manifest_index_dirs(index_uuids.iter().copied()) + self.cleanup_uncommitted_manifest_index_dirs(object_store, index_uuids.iter().copied()) .await; } async fn cleanup_uncommitted_manifest_index_dirs( &self, + object_store: &ObjectStore, index_uuids: impl IntoIterator, ) { for index_uuid in index_uuids { @@ -1856,7 +1858,7 @@ impl ManifestNamespace { .join(MANIFEST_TABLE_NAME) .join(LANCE_INDICES_DIR) .join(index_uuid.to_string()); - if let Err(err) = self.object_store.remove_dir_all(index_dir.clone()).await + if let Err(err) = object_store.remove_dir_all(index_dir.clone()).await && !matches!(err, LanceError::NotFound { .. }) { log::warn!( @@ -1898,6 +1900,13 @@ impl ManifestNamespace { manifest.set_timestamp(timestamp_nanos); manifest.update_max_fragment_id(); + // Commit through the dataset's own object store, not `self.object_store`: for + // stores like `memory://` the namespace and the dataset can hold different + // instances, and a commit written to the wrong one is invisible to reads. + let object_store = dataset + .object_store(None) + .await + .map_err(CommitError::from)?; let base_path = self.base_path.clone().join(MANIFEST_TABLE_NAME); let naming_scheme = dataset.manifest_location().naming_scheme; commit_handler @@ -1905,7 +1914,7 @@ impl ManifestNamespace { manifest, indices, &base_path, - &self.object_store, + &object_store, write_manifest_file_to_path, naming_scheme, Some((&transaction).into()), @@ -1987,6 +1996,9 @@ impl ManifestNamespace { let dataset_guard = self.manifest_dataset.get_refreshed().await?; let dataset = Arc::new(dataset_guard.clone()); drop(dataset_guard); + // Staged files, indices, the commit, and cleanup must all use the dataset's + // own object store (see `commit_manifest_overwrite`). + let object_store = dataset.object_store(None).await?; let source = Self::manifest_projected_stream(&dataset).await?; let resolution = make_mutation().conflict_resolution(); @@ -2040,7 +2052,7 @@ impl ManifestNamespace { .collect::>(); if !mutation.has_changes { - self.cleanup_staged_manifest_files(&staged_data_files, &[]) + self.cleanup_staged_manifest_files(&object_store, &staged_data_files, &[]) .await; return Ok(mutation.result); } @@ -2059,8 +2071,12 @@ impl ManifestNamespace { { Ok(indices) => Some(indices), Err(err) => { - self.cleanup_staged_manifest_files(&staged_data_files, &index_uuids) - .await; + self.cleanup_staged_manifest_files( + &object_store, + &staged_data_files, + &index_uuids, + ) + .await; return Err(err); } } @@ -2095,8 +2111,12 @@ impl ManifestNamespace { let _ = self.manifest_dataset.get_refreshed().await; return Ok(mutation.result); } - self.cleanup_staged_manifest_files(&staged_data_files, staged_index_uuids) - .await; + self.cleanup_staged_manifest_files( + &object_store, + &staged_data_files, + staged_index_uuids, + ) + .await; match err { CommitError::CommitConflict => { if let Some(output) = @@ -4586,6 +4606,47 @@ mod tests { ); } + #[tokio::test] + async fn test_manifest_commit_visible_on_memory_store() { + // Regression: the commit must use the same object store the manifest dataset reads + // from. On `memory://` the namespace store and the dataset store can be different + // in-memory instances, so a commit written to the wrong one is invisible to reads + // (manifests as stale version -> endless conflict / "not found"). + let manifest_ns = create_manifest_namespace("memory://test_commit_visible", false).await; + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table".to_string(), + object_type: ObjectType::Table, + location: Some("table.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + assert!(manifest_ns.manifest_contains_object("table").await.unwrap()); + // A second sequential commit must not falsely conflict. + manifest_ns + .insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: "table2".to_string(), + object_type: ObjectType::Table, + location: Some("table2.lance".to_string()), + metadata: None, + }], + None, + ) + .await + .unwrap(); + assert!( + manifest_ns + .manifest_contains_object("table2") + .await + .unwrap() + ); + } + #[tokio::test] async fn test_manifest_commit_uses_inline_transaction() { let temp_dir = TempStdDir::default();