From 6d42fc75f83c1c9e83f552b2086c91aeb45ffef6 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 19:11:03 +0200 Subject: [PATCH 1/9] chore: correct stale global-lock comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The global Arc> that once serialized every server write was removed — the server holds the engine as a lockless Arc and write methods are &self, so the per-(table_key, branch) write queues are now the actual write-serialization mechanism (in-process only). Correct comments that still claimed the global lock is 'still in place' / 'today', or framed the queues as MR-686 scaffolding: write_queue.rs module doc, exec/merge.rs, db/omnigraph/schema_apply.rs, db/manifest/recovery.rs, and the bench_concurrent_http.rs example (which also wrongly stated mutate_as is &mut self). workload.rs is left as-is — its 'previous global RwLock' wording is accurate history. --- .../examples/bench_concurrent_http.rs | 15 ++++++++------- crates/omnigraph/src/db/manifest/recovery.rs | 8 ++++---- .../src/db/omnigraph/schema_apply.rs | 8 ++++---- crates/omnigraph/src/db/write_queue.rs | 19 +++++++++++-------- crates/omnigraph/src/exec/merge.rs | 6 +++--- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/crates/omnigraph-server/examples/bench_concurrent_http.rs b/crates/omnigraph-server/examples/bench_concurrent_http.rs index 6a8411a3..044b2cec 100644 --- a/crates/omnigraph-server/examples/bench_concurrent_http.rs +++ b/crates/omnigraph-server/examples/bench_concurrent_http.rs @@ -1,14 +1,15 @@ //! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline). //! //! Drives concurrent `/change` requests against an in-process Omnigraph HTTP -//! server. Measures the global `Arc>` lock penalty on -//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline. +//! server. Originally written to measure the global `Arc>` +//! lock penalty as an MR-686 baseline; that lock has since been removed +//! (engine write APIs are `&self`, the server holds a lockless +//! `Arc`), so this now measures the concurrent write path itself +//! (per-`(table, branch)` queue contention + Lance I/O). //! -//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as` -//! is `&mut self`, so an engine-level concurrent bench either serializes on the -//! borrow checker (measures nothing) or drives multiple handles (measures Lance -//! contention, not the server bottleneck). Driving the HTTP server is the only -//! way to measure the actual `RwLock` contention this work removes. +//! Driving the HTTP server is still the right level: an engine-level bench on +//! a single handle measures Lance contention, not the server's request-path +//! concurrency. //! //! Usage: //! ```sh diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index d49e86a9..968d3f4d 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -793,10 +793,10 @@ pub(crate) fn schema_apply_serial_queue_key() -> crate::db::write_queue::TableQu /// same table append extra Lance restore commits which `omnigraph /// cleanup` reclaims. /// -/// Concurrency: today recovery runs synchronously in `Omnigraph::open` -/// *before* the engine is wrapped in the server's `Arc>`. -/// No request handlers can race, so this sweep does NOT acquire write -/// queues. In-process callers (refresh, write entry points) must use +/// Concurrency: the open-time sweep runs synchronously in `Omnigraph::open` +/// before the engine handle is published to any caller, so no request +/// handler can race it and it does NOT acquire write queues. In-process +/// callers (refresh, write entry points) must use /// [`heal_pending_sidecars_roll_forward`] instead, which serializes /// against live writers via per-(table_key, branch) queue acquisition. pub(crate) async fn recover_manifest_drift( diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index f965ad46..e0a8fd15 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -432,10 +432,10 @@ where // manifest publish via `commit_changes_with_actor` below. // // Schema-apply already holds the graph-wide `__schema_apply_lock__` - // sentinel branch, so under PR 1b's intermediate state these - // per-table acquisitions are uncontended. They exist for symmetry - // with future MR-870 recovery, which will need queue acquisition - // before any `Dataset::restore` it issues for SchemaApply sidecars. + // sentinel branch, so these per-table acquisitions are uncontended in + // practice. They exist for symmetry with the recovery reconciler, which + // acquires the same queues before any `Dataset::restore` it issues for + // SchemaApply sidecars. let mut schema_apply_queue_keys: Vec<(String, Option)> = recovery_pins .iter() .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) diff --git a/crates/omnigraph/src/db/write_queue.rs b/crates/omnigraph/src/db/write_queue.rs index 1f0c53a6..18a14d1c 100644 --- a/crates/omnigraph/src/db/write_queue.rs +++ b/crates/omnigraph/src/db/write_queue.rs @@ -1,12 +1,15 @@ -//! Per-`(table_key, branch)` writer queues — MR-686 scaffolding. +//! Per-`(table_key, branch)` writer queues. //! -//! Today every server-layer write serializes on the global -//! `Arc>` in `AppState`. MR-686 replaces that with -//! per-`(table_key, branch_ref)` queues so disjoint-key writes proceed -//! concurrently. This module owns the queue data structure; callers in -//! `MutationStaging::commit_all`, `branch_merge`, `schema_apply`, -//! `ensure_indices`, `delete_where`, and the future MR-870 recovery -//! reconciler acquire guards before any per-table Lance commit. +//! These queues are the engine's write-serialization mechanism: the server +//! holds the engine as a lockless `Arc` (writes are `&self`), so +//! disjoint-key writes proceed concurrently and only writes to the same +//! `(table_key, branch_ref)` serialize here. This module owns the queue +//! data structure; callers in `MutationStaging::commit_all`, `branch_merge`, +//! `schema_apply`, `ensure_indices`, `delete_where`, the fork path (first +//! write to a table on a branch — acquired before the fork, held through the +//! manifest publish), and the recovery reconciler acquire guards before any +//! per-table Lance commit. Serialization is in-process only; cross-process +//! writers on one graph remain one-winner-CAS at the manifest publish. //! //! ## Why exclusive `tokio::sync::Mutex<()>` per key //! diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index ea16b155..5d0be74e 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1323,9 +1323,9 @@ impl Omnigraph { // branch_merge writes only to the target branch. // // Held across the per-table publish loop and the manifest - // commit + record_merge_commit calls below. Under PR 1b's - // intermediate state (global server RwLock still in place), - // this acquisition is uncontended. + // commit + record_merge_commit calls below, so no concurrent + // writer to a touched (table, target_branch) can interleave + // between our commit_staged and our publish. let active_branch_for_keys = self.active_branch().await; let merge_queue_keys: Vec<(String, Option)> = ordered_table_keys .iter() From 3edbe2e7715cedfa1df32c9bf48c30d4d3b226e6 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 19:11:15 +0200 Subject: [PATCH 2/9] test: regression for self-healing a manifest-unreferenced fork MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An interrupted first-write fork (create_branch succeeded, the manifest publish did not) leaves a fully-formed Lance branch ref the manifest never references. The branch stays a valid manifest branch, so cleanup's reconciler never reclaims it, and today the next write to that table wedges with 'incomplete prior delete; run cleanup'. Forge that exact residue (a live 'feature' branch + a directly-created 'feature' ref on the Person table the manifest doesn't reference) and assert the next load AND mutate self-heal. Deterministic and local — no S3 or timing, since the forge IS the post-crash state. Adds a shared node_table_uri helper. This commit is RED: it reproduces the bug and fails against the unfixed engine with the predicted symptom. The fix follows in the next commit. --- crates/omnigraph/tests/helpers/mod.rs | 13 ++++++ crates/omnigraph/tests/writes.rs | 59 +++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 295cab74..6476e1ae 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -54,6 +54,19 @@ pub async fn init_and_load(dir: &tempfile::TempDir) -> Omnigraph { db } +/// On-disk Lance dataset URI for a node type, mirroring the engine's +/// `nodes/{fnv1a(type)}` layout. Used by tests that reach the raw Lance +/// dataset to forge or inspect branch state. (Local copies exist in +/// `failpoints.rs` / `maintenance.rs`; this is the shared one for new tests.) +pub fn node_table_uri(root: &str, type_name: &str) -> String { + let mut hash: u64 = 0xcbf2_9ce4_8422_2325; + for &b in type_name.as_bytes() { + hash ^= b as u64; + hash = hash.wrapping_mul(0x100_0000_01b3); + } + format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) +} + /// Read all rows from a sub-table by table_key. pub async fn read_table(db: &Omnigraph, table_key: &str) -> Vec { let snap = snapshot_main(db).await.unwrap(); diff --git a/crates/omnigraph/tests/writes.rs b/crates/omnigraph/tests/writes.rs index b006f4c1..6a41cd1b 100644 --- a/crates/omnigraph/tests/writes.rs +++ b/crates/omnigraph/tests/writes.rs @@ -1540,3 +1540,62 @@ async fn second_sequential_update_on_same_row_succeeds() { "Alice's age must reflect the second update" ); } + +// An interrupted first-write fork (create_branch succeeded, the manifest +// publish did not) leaves a fully-formed Lance branch ref on the table that +// the manifest never references — a "manifest-unreferenced fork". The branch +// itself stays a valid manifest branch, so `cleanup`'s reconciler (keyed on +// the manifest branch list) never reclaims it. Today the next write to that +// table on that branch re-enters the fork path, `create_branch` collides, and +// the engine wedges with "incomplete prior delete; run `omnigraph cleanup`". +// +// We forge that exact residue (a live `feature` branch + a directly-created +// `feature` ref on the Person table the manifest doesn't reference) and assert +// the next write — via both `load` and `mutate` — self-heals by reclaiming the +// orphan fork and re-forking, rather than wedging. No process death / timing +// needed: the forge is the post-crash state. +#[tokio::test] +async fn first_write_self_heals_manifest_unreferenced_fork_on_live_branch() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = init_and_load(&dir).await; + db.branch_create("feature").await.unwrap(); + + // Forge the manifest-unreferenced fork directly at the Lance layer. + let person_uri = node_table_uri(&uri, "Person"); + { + let mut ds = lance::Dataset::open(&person_uri).await.unwrap(); + let base = ds.version().version; + ds.create_branch("feature", base, None).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("feature"), + "precondition: forged orphan fork present on Person" + ); + } + + // load → must self-heal, not wedge with "incomplete prior delete". + let row = r#"{"type":"Person","data":{"name":"Zoe","age":30}}"#; + db.load_as("feature", None, row, LoadMode::Merge, None) + .await + .expect("load onto a manifest-unreferenced fork must self-heal, not wedge"); + + // mutate → same path, must also self-heal. + mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Yan")], &[("$age", 41)]), + ) + .await + .expect("mutate onto a manifest-unreferenced fork must self-heal"); + + // The healed branch holds the new rows; main is untouched (still no Zoe/Yan). + let feature_people = count_rows_branch(&db, "feature", "node:Person").await; + let main_people = count_rows(&db, "node:Person").await; + assert!( + feature_people >= main_people + 2, + "feature must contain the two new rows on top of the inherited set \ + (feature={feature_people}, main={main_people})" + ); +} From 6e3204d3145378c0a3755c8807f8f5f2d7fe8eca Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 19:11:37 +0200 Subject: [PATCH 3/9] fix: self-heal manifest-unreferenced branch forks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first write to a table on a branch lazily forks it via Lance create_branch, a durable two-phase op that advances Lance state BEFORE the atomic manifest publish. If the writer dies or its request future is cancelled between the fork and the publish, the branch ref is fully formed but the manifest never references it. The next write re-enters the fork path, create_branch collides, and the engine wedged with 'orphaned table state ... incomplete prior delete; run cleanup' — which cleanup could not even fix, because the branch is still a live manifest branch. This hit load, mutate, ingest, and the merge fork path (one shared engine chokepoint), so a routine deploy restart or client disconnect could wedge a branch. Fix: treat the per-table fork ref as derived state of the manifest. fork_branch_ from_state returns a typed ForkOutcome instead of a human 'incomplete prior delete' error; on RefAlreadyExists the db layer reclaims the manifest- unreferenced fork (force_delete_branch + re-fork, exactly once) and proceeds. A live committed fork is still routed to a retryable conflict before the fork path, so concurrent first-writes stay correct. Reclaim is only safe if no in-process writer can be mid-fork, so the write entry points (load, mutate) acquire the per-(table, branch) write queues for all touched tables up front — before the fork, held through the publish — when forking a non-main branch. commit_all accepts these pre-held guards instead of re-acquiring (the queue is non-re-entrant). The merge fork path already holds the queue and self-heals through the shared wrapper. Cross-process in-flight forks remain the documented one-winner-CAS gap. Mechanical prep folded in: mutation IR lowering is hoisted so the touched-table set is known before execution; commit_all gains the held_guards parameter. Flips recreate_over_orphaned_fork_before_cleanup_is_actionable to assert self-heal; fork_collision_with_live_concurrent_fork_is_retryable still holds. Docs: writes.md cancelled-future note, invariants.md cross-process known gap. --- crates/omnigraph/src/db/omnigraph.rs | 34 +++++- .../omnigraph/src/db/omnigraph/table_ops.rs | 82 +++++++++++++- crates/omnigraph/src/exec/mutation.rs | 103 ++++++++++++++++-- crates/omnigraph/src/exec/staging.rs | 59 +++++++--- crates/omnigraph/src/loader/mod.rs | 47 +++++++- crates/omnigraph/src/storage_layer.rs | 43 ++++++-- crates/omnigraph/src/table_store.rs | 29 +++-- crates/omnigraph/tests/failpoints.rs | 35 +++--- docs/dev/invariants.md | 16 +++ docs/dev/writes.md | 10 +- 10 files changed, 373 insertions(+), 85 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 779a2e04..8c46d144 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -113,10 +113,11 @@ pub struct Omnigraph { /// Read-heavy on schema introspection paths, written only by /// `apply_schema`. Same ArcSwap rationale as `catalog`. schema_source: Arc>, - /// Per-`(table_key, branch)` writer queues. Reachable from engine - /// internals (mutation finalize, schema_apply, branch_merge, - /// ensure_indices, delete_where) and from future MR-870 recovery - /// reconciler. PR 1b adds the field; callers acquire in commits 4+. + /// Per-`(table_key, branch)` writer queues — the engine's + /// write-serialization mechanism (the server holds the engine as a + /// lockless `Arc`). Reachable from engine internals + /// (mutation finalize, schema_apply, branch_merge, ensure_indices, + /// delete_where, the fork path, recovery reconciler). write_queue: Arc, /// Process-wide mutex held across the swap → operate → restore window /// in `branch_merge_impl`. Two concurrent merges with distinct targets @@ -1479,6 +1480,13 @@ impl Omnigraph { table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await } + /// Fork `table_key` onto `active_branch` from the given source state, + /// self-healing a manifest-unreferenced leftover fork if one is in the + /// way. Callers that reach this MUST already hold the per-`(table_key, + /// active_branch)` write queue (so the reclaim cannot race an in-process + /// fork) and must have confirmed via the live manifest that the table is + /// not yet on `active_branch`. Both the first-write fork path + /// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this. pub(crate) async fn fork_dataset_from_entry_state( &self, table_key: &str, @@ -1487,7 +1495,7 @@ impl Omnigraph { source_version: u64, active_branch: &str, ) -> Result { - table_ops::fork_dataset_from_entry_state( + match table_ops::fork_dataset_from_entry_state( self, table_key, full_path, @@ -1495,7 +1503,21 @@ impl Omnigraph { source_version, active_branch, ) - .await + .await? + { + crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds), + crate::storage_layer::ForkOutcome::RefAlreadyExists => { + table_ops::reclaim_orphaned_fork_and_refork( + self, + table_key, + full_path, + source_branch, + source_version, + active_branch, + ) + .await + } + } } pub(crate) async fn reopen_for_mutation( diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index f7a365af..9e754148 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -160,9 +160,8 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st // that needs index work. Held across the per-table commit loop and // the manifest publish at the end of this function. Sorted-order // acquisition prevents lock-order inversion against concurrent - // multi-table writers (mutation finalize, branch_merge, future - // MR-870 recovery). Under PR 1b's intermediate state (global server - // RwLock still in place), this acquisition is uncontended. + // multi-table writers (mutation finalize, branch_merge, the fork + // path, recovery). let queue_keys: Vec<(String, Option)> = recovery_pins .iter() .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) @@ -499,8 +498,14 @@ pub(super) async fn open_owned_dataset_for_branch_write( )); } } - fork_dataset_from_entry_state( - db, + // The fork advances Lance state before the manifest publish. The + // caller holds the per-(table, active_branch) write queue from + // before this fork through the publish, so a leftover ref is a + // manifest-unreferenced fork (interrupted prior fork, or + // delete+recreate), not a live in-process fork. The wrapper + // self-heals it (reclaim + re-fork); see + // `Omnigraph::fork_dataset_from_entry_state`. + db.fork_dataset_from_entry_state( table_key, full_path, source_branch, @@ -528,7 +533,7 @@ pub(super) async fn fork_dataset_from_entry_state( source_branch: Option<&str>, source_version: u64, active_branch: &str, -) -> Result { +) -> Result> { db.storage() .fork_branch_from_state( full_path, @@ -540,6 +545,71 @@ pub(super) async fn fork_dataset_from_entry_state( .await } +/// Reclaim a manifest-unreferenced fork and re-fork in its place. +/// +/// Reached only when `fork_branch_from_state` reported `RefAlreadyExists` +/// AND the caller already proved (live-manifest authority re-check, under the +/// held per-`(table, active_branch)` write queue) that the manifest does not +/// place this table on `active_branch`. So the leftover ref is reclaimable +/// derived state: drop it (idempotent `force_delete_branch`) and re-fork, +/// exactly once. A second collision means a concurrent *foreign-process* +/// writer recreated the ref (in-process is impossible while we hold the +/// queue) — the documented one-winner-CAS gap — so surface a retryable +/// conflict; on retry the winner's fork is visible and the no-fork path runs. +pub(super) async fn reclaim_orphaned_fork_and_refork( + db: &Omnigraph, + table_key: &str, + full_path: &str, + source_branch: Option<&str>, + source_version: u64, + active_branch: &str, +) -> Result { + crate::failpoints::maybe_fail("fork.before_reclaim")?; + db.storage() + .force_delete_branch(full_path, active_branch) + .await + .map_err(|e| { + // Lance refuses to delete a branch with dependent child branches + // even under force (`refs.rs` RefConflict "referenced by N + // versions"). Unreachable for a leaf first-write fork, but surface + // it actionably instead of looping. + if e.to_string().contains("referenced by") { + OmniError::manifest_conflict(format!( + "branch '{active_branch}' cannot reclaim the leftover fork for \ + table '{table_key}' because it has dependent child branches; \ + delete the child branches (or run `omnigraph cleanup`) first" + )) + } else { + e + } + })?; + + match fork_dataset_from_entry_state( + db, + table_key, + full_path, + source_branch, + source_version, + active_branch, + ) + .await? + { + crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds), + crate::storage_layer::ForkOutcome::RefAlreadyExists => { + let live = db.snapshot_for_branch(Some(active_branch)).await?; + let actual = live + .entry(table_key) + .map(|e| e.table_version) + .unwrap_or(source_version); + Err(OmniError::manifest_expected_version_mismatch( + table_key, + source_version, + actual, + )) + } + } +} + pub(super) async fn reopen_for_mutation( db: &Omnigraph, table_key: &str, diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index e9051c43..8fed05db 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -741,14 +741,45 @@ impl Omnigraph { // tables. Branch is threaded explicitly — no coordinator swap. let mut staging = MutationStaging::default(); + // Lower + validate up front so the touched-table set is known before + // execution. A lowering/validation error returns exactly as it did + // when this happened inside execute_named_mutation. + let ir = self.lower_named_mutation(query_source, query_name)?; + + // Up-front fork-queue acquisition (see the loader for the full + // rationale): if this mutation will fork any touched table onto a + // non-main branch, acquire the per-(table, branch) write queues for + // every touched table before the first fork and hold them through the + // publish, so the orphan-fork reclaim can't race a concurrent + // in-process fork. The touched set is derived from the lowered IR. + let fork_queue_guards: Option<( + Vec<(String, Option)>, + Vec>, + )> = if let Some(active) = requested.as_deref() { + let snapshot = self.snapshot_for_branch(Some(active)).await?; + let touched: Vec<(String, Option)> = self + .touched_table_keys(&ir) + .into_iter() + .map(|k| (k, Some(active.to_string()))) + .collect(); + let needs_fork = touched.iter().any(|(table_key, _)| { + snapshot + .entry(table_key) + .map(|e| e.table_branch.as_deref() != Some(active)) + .unwrap_or(false) + }); + if needs_fork { + let guards = self.write_queue().acquire_many(&touched).await; + Some((touched, guards)) + } else { + None + } + } else { + None + }; + let exec_result = self - .execute_named_mutation( - query_source, - query_name, - &resolved_params, - requested.as_deref(), - &mut staging, - ) + .execute_named_mutation(&ir, &resolved_params, requested.as_deref(), &mut staging) .await; match exec_result { @@ -768,6 +799,7 @@ impl Omnigraph { requested.as_deref(), crate::db::manifest::SidecarKind::Mutation, actor_id, + fork_queue_guards, ) .await?; // Failpoint that wedges the documented finalize→publisher @@ -817,14 +849,19 @@ impl Omnigraph { } } - async fn execute_named_mutation( + /// Lower + validate a named mutation query into its IR. + /// + /// Hoisted out of [`Self::execute_named_mutation`] so the caller can + /// inspect the IR before execution — specifically to compute the + /// touched-table set (see [`Self::touched_table_keys`]) for up-front + /// write-queue acquisition. Performs the same find → typecheck → lower + /// → D₂ checks that execution previously did inline, so error behavior + /// is unchanged. + fn lower_named_mutation( &self, query_source: &str, query_name: &str, - params: &ParamMap, - branch: Option<&str>, - staging: &mut MutationStaging, - ) -> Result { + ) -> Result { let query_decl = omnigraph_compiler::find_named_query(query_source, query_name) .map_err(|e| OmniError::manifest(e.to_string()))?; @@ -841,7 +878,49 @@ impl Omnigraph { let ir = lower_mutation_query(&query_decl)?; // D₂: reject mixed insert/update + delete before any I/O. enforce_no_mixed_destructive_constructive(&ir)?; + Ok(ir) + } + /// The set of `(node|edge):{type}` table keys a mutation IR touches, as + /// they will be keyed in `MutationStaging`/`commit_all`. Mirrors the + /// node-vs-edge dispatch the execute paths use (`node_types` first, then + /// `edge_types`), so the keys match what `commit_all` recomputes. + /// Unknown types are skipped (the execute path surfaces the error); + /// they must not be locked. Sorted + deduped for one-shot + /// `acquire_many`. Used by the up-front fork-queue acquisition. + fn touched_table_keys(&self, ir: &omnigraph_compiler::ir::MutationIR) -> Vec { + use omnigraph_compiler::ir::MutationOpIR; + let catalog = self.catalog(); + let mut keys: Vec = ir + .ops + .iter() + .filter_map(|op| { + let type_name = match op { + MutationOpIR::Insert { type_name, .. } + | MutationOpIR::Update { type_name, .. } + | MutationOpIR::Delete { type_name, .. } => type_name, + }; + if catalog.node_types.contains_key(type_name) { + Some(format!("node:{type_name}")) + } else if catalog.edge_types.contains_key(type_name) { + Some(format!("edge:{type_name}")) + } else { + None + } + }) + .collect(); + keys.sort(); + keys.dedup(); + keys + } + + async fn execute_named_mutation( + &self, + ir: &omnigraph_compiler::ir::MutationIR, + params: &ParamMap, + branch: Option<&str>, + staging: &mut MutationStaging, + ) -> Result { let mut total = MutationResult::default(); for op in &ir.ops { let result = match op { diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index cbfd52d4..f84b9af2 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -463,12 +463,28 @@ impl StagedMutation { /// unreferenced (cleaned by `cleanup_old_versions`'s age sweep) /// rather than being committed and creating a Lance-HEAD-ahead /// residual. + /// `held_guards`: when the caller already holds the per-`(table_key, + /// branch)` write queues for every touched table (the fork path acquires + /// them up front, before the fork, and holds them through the manifest + /// publish), it passes `(acquired_keys, guards)` here so `commit_all` + /// reuses them instead of re-acquiring — the queue is a non-re-entrant + /// `tokio::Mutex`, so re-acquiring a held key would self-deadlock. + /// `None` (the steady-state path) means `commit_all` acquires them + /// itself. `acquired_keys` must cover every key `commit_all` would + /// acquire (debug-asserted below) — the guards from `acquire_many` don't + /// carry their keys, so the caller hands the key set alongside them. The + /// fork path guarantees coverage by keying every touched table uniformly + /// by the resolved target branch. pub(crate) async fn commit_all( self, db: &crate::db::Omnigraph, branch: Option<&str>, sidecar_kind: SidecarKind, actor_id: Option<&str>, + held_guards: Option<( + Vec<(String, Option)>, + Vec>, + )>, ) -> Result<( Vec, HashMap, @@ -483,21 +499,18 @@ impl StagedMutation { op_kinds, } = self; - // Acquire per-(table_key, branch) queues for every touched - // table — both staged and inline-committed. Sorted by - // `acquire_many` internally so all multi-table writers - // (mutation, branch_merge, schema_apply, future MR-870 - // recovery) agree on acquisition order — prevents lock-order - // inversion deadlock. + // Per-(table_key, branch) queues for every touched table — both + // staged and inline-committed. Sorted by `acquire_many` internally + // so all multi-table writers (mutation, branch_merge, schema_apply, + // the fork path, recovery) agree on acquisition order — prevents + // lock-order inversion deadlock. // - // For inline-committed tables (delete-only mutations), Lance - // HEAD has already advanced inside `delete_where` before - // `commit_all` runs. Holding the queue here doesn't prevent - // that interleaving (commit 6 will move queue acquisition into - // `delete_where`'s call site); it does prevent another writer - // from interleaving between our delete and our publish, which - // would otherwise leave a Lance-HEAD-ahead residual the - // delete-only sidecar (added below) would have to recover. + // For inline-committed tables (delete-only mutations), Lance HEAD + // has already advanced inside `delete_where` before `commit_all` + // runs. Holding the queue here prevents another writer from + // interleaving between our delete and our publish, which would + // otherwise leave a Lance-HEAD-ahead residual the delete-only + // sidecar (added below) would have to recover. let mut queue_keys: Vec<(String, Option)> = Vec::with_capacity(staged.len() + inline_committed.len()); for entry in &staged { @@ -512,7 +525,23 @@ impl StagedMutation { })?; queue_keys.push((table_key.clone(), path.table_branch.clone())); } - let guards = db.write_queue().acquire_many(&queue_keys).await; + // Reuse the caller's guards (fork path) when handed in, else acquire + // our own. When reusing, every key we would acquire MUST already be + // covered — re-acquiring a held non-re-entrant key would deadlock. + let guards = match held_guards { + Some((acquired_keys, guards)) => { + debug_assert!( + { + let held: std::collections::HashSet<&(String, Option)> = + acquired_keys.iter().collect(); + queue_keys.iter().all(|k| held.contains(k)) + }, + "commit_all: held_guards must cover every touched-table queue key" + ); + guards + } + None => db.write_queue().acquire_many(&queue_keys).await, + }; // Re-capture manifest pins under the queue (PR 2 / MR-686). // diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 69ada797..2365243d 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -418,6 +418,45 @@ async fn load_jsonl_reader( LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, }; + // Up-front fork-queue acquisition. The first write to a table on a + // non-main branch forks it (create_branch), which advances Lance state + // before the manifest publish; the reclaim of any manifest-unreferenced + // leftover (`reclaim_orphaned_fork_and_refork`) must not race a concurrent + // in-process fork. So when this load will fork at least one touched table, + // acquire the per-(table, branch) write queues for ALL touched tables up + // front (one sorted `acquire_many`, keyed uniformly by the target branch + // so it covers what `commit_all` recomputes) and hold them through the + // publish. Main-branch loads never fork; branch loads where every touched + // table is already forked skip this and let `commit_all` acquire at commit. + let fork_queue_guards: Option<( + Vec<(String, Option)>, + Vec>, + )> = if let Some(active) = branch { + let touched: Vec<(String, Option)> = node_rows + .keys() + .map(|t| (format!("node:{t}"), Some(active.to_string()))) + .chain( + edge_rows + .keys() + .map(|e| (format!("edge:{e}"), Some(active.to_string()))), + ) + .collect(); + let needs_fork = touched.iter().any(|(table_key, _)| { + snapshot + .entry(table_key) + .map(|e| e.table_branch.as_deref() != Some(active)) + .unwrap_or(false) + }); + if needs_fork { + let guards = db.write_queue().acquire_many(&touched).await; + Some((touched, guards)) + } else { + None + } + } else { + None + }; + // Phase 2a: build and validate every node batch up front. Cheap and // synchronous — surfaces validation errors before any S3 traffic. let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> = @@ -551,7 +590,13 @@ async fn load_jsonl_reader( // across the manifest publish below — see exec/mutation.rs for // the rationale (interleaving prevention). let (updates, expected_versions, sidecar_handle, _queue_guards) = staged - .commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id) + .commit_all( + db, + branch, + crate::db::manifest::SidecarKind::Load, + actor_id, + fork_queue_guards, + ) .await?; // Same finalize → publisher residual as mutations: per-table // staged commits have advanced Lance HEAD, but the manifest diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index d2f6b011..ef31ed3c 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -184,6 +184,22 @@ pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec { + Created(D), + RefAlreadyExists, +} + // ─── TableStorage trait ──────────────────────────────────────────────────── /// Engine-internal trait covering every Lance dataset operation an @@ -231,7 +247,7 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { table_key: &str, source_version: u64, target_branch: &str, - ) -> Result; + ) -> Result>; async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>; @@ -497,17 +513,22 @@ impl TableStorage for TableStore { table_key: &str, source_version: u64, target_branch: &str, - ) -> Result { - TableStore::fork_branch_from_state( - self, - dataset_uri, - source_branch, - table_key, - source_version, - target_branch, + ) -> Result> { + Ok( + match TableStore::fork_branch_from_state( + self, + dataset_uri, + source_branch, + table_key, + source_version, + target_branch, + ) + .await? + { + ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)), + ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists, + }, ) - .await - .map(SnapshotHandle::new) } async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> { diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 65123c00..319f5a29 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write}; use crate::db::{Snapshot, SubTableEntry}; use crate::error::{OmniError, Result}; +use crate::storage_layer::ForkOutcome; #[derive(Debug, Clone, PartialEq, Eq)] pub struct TableState { @@ -285,7 +286,7 @@ impl TableStore { table_key: &str, source_version: u64, target_branch: &str, - ) -> Result { + ) -> Result> { let mut source_ds = self .open_dataset_head(dataset_uri, source_branch) .await? @@ -299,26 +300,24 @@ impl TableStore { .await .is_err() { - // The target branch ref already exists. The caller - // (`open_owned_dataset_for_branch_write`) re-reads the live manifest - // before forking and returns a retryable error when a concurrent - // writer legitimately holds the fork, so reaching here means the - // manifest does NOT reference this fork: it is an orphan from an - // incomplete prior `branch_delete`. Surface the actionable cleanup - // error rather than guessing from Lance branch versions. - return Err(OmniError::manifest_conflict(format!( - "branch '{}' has orphaned table state for '{}' from an incomplete \ - prior delete; run `omnigraph cleanup` to reclaim it before reusing \ - this branch name", - target_branch, table_key - ))); + // A branch ref for `target_branch` already exists on this table (a + // fully-formed manifest-unreferenced fork, or a phase-1-only Lance + // "zombie" — `create_branch` fails for both). The caller + // (`open_owned_dataset_for_branch_write`) has already re-read the + // live manifest under the held write queue and confirmed it does + // NOT place this table on `target_branch`, so the ref is reclaimable + // derived state, not authoritative. Return the typed signal and let + // the db layer reclaim + re-fork rather than guessing or hard-failing + // here. (A live committed fork is routed to a retryable conflict + // upstream, before we ever reach this fork path.) + return Ok(ForkOutcome::RefAlreadyExists); } let ds = self .open_dataset_head(dataset_uri, Some(target_branch)) .await?; self.ensure_expected_version(&ds, table_key, source_version)?; - Ok(ds) + Ok(ForkOutcome::Created(ds)) } pub async fn scan_batches(&self, ds: &Dataset) -> Result> { diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index b45cfa0a..cccf222a 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -127,12 +127,12 @@ async fn branch_delete_partial_failure_converges_via_cleanup() { } // Reusing a branch name whose delete left an orphaned fork (before `cleanup` -// reconciles it) must fail with a clear, actionable error pointing at -// `cleanup`, not the opaque `ExpectedVersionMismatch` that leaks from the fork -// path. The recreate itself succeeds; the first write to the previously-forked -// table is where the stale orphan collides. +// reconciles it) must SELF-HEAL on the next write — the write reclaims the +// manifest-unreferenced fork and re-forks, rather than wedging with "incomplete +// prior delete; run cleanup". (This test was the inverse before the fork-as- +// idempotent-reconcile fix; its flip is the signal the bug class is closed.) #[tokio::test] -async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() { +async fn recreate_over_orphaned_fork_self_heals_without_cleanup() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); @@ -158,10 +158,10 @@ async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() { } // Recreate the name and write to the previously-forked table WITHOUT a - // cleanup in between. + // cleanup in between. The write must self-heal the stale orphan fork. main.branch_create("feature").await.unwrap(); let mut feature2 = Omnigraph::open(&uri).await.unwrap(); - let err = helpers::mutate_branch( + helpers::mutate_branch( &mut feature2, "feature", MUTATION_QUERIES, @@ -169,17 +169,18 @@ async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() { &mixed_params(&[("$name", "Frank")], &[("$age", 41)]), ) .await - .expect_err("write should collide with the stale orphaned fork"); + .expect("recreate-over-orphan write must self-heal, not require cleanup"); - let msg = err.to_string(); - assert!( - msg.contains("cleanup") - && (msg.contains("orphan") || msg.contains("incomplete prior delete")), - "expected an actionable orphaned-fork error pointing at cleanup, got: {msg}" - ); - assert!( - !msg.contains("expected manifest table version"), - "should not surface the opaque ExpectedVersionMismatch, got: {msg}" + // The recreated branch forks FRESH from main: the deleted branch's Eve is + // gone and only the new Frank is added on top of main's seed. A count of + // main + 2 would mean Eve resurrected from the stale fork (the bug). + let main_people = helpers::count_rows(&main, "node:Person").await; + let feature_people = helpers::count_rows_branch(&feature2, "feature", "node:Person").await; + assert_eq!( + feature_people, + main_people + 1, + "self-healed feature must fork fresh from main (+Frank only); \ + main={main_people}, feature={feature_people} (main+2 ⇒ Eve resurrected)" ); } diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index a0bcc6de..d1454464 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -160,6 +160,22 @@ them explicit. one-winner-CAS territory; closing this fully needs a cross-process serialization primitive (e.g. lease-based use of the schema-apply lock branch) — design it before promoting multi-process write topologies. +- **Fork reclaim is in-process-safe only:** the first write to a table on a + branch forks it (a Lance `create_branch` that advances state before the + manifest publish). An interrupted fork (crash, or a cancelled request + future) leaves a manifest-unreferenced branch ref. The next write self-heals + it — `reclaim_orphaned_fork_and_refork` (`force_delete_branch` + re-fork) + — but reclaim is only safe because the writer holds the per-`(table, + branch)` write queue from before the fork through the publish AND re-checks + the live manifest under it, so no *in-process* writer can be mid-fork. A + reclaim cannot serialize against a foreign-*process* in-flight fork: it may + force-delete a peer's just-created ref, which makes that peer's commit fail + and retry — the same one-winner-CAS exposure as above, not corruption. The + reclaim never fires unless in-process-queue + manifest authority both prove + the ref is manifest-unreferenced. `cleanup`'s per-table reconciler + (`reconcile_orphaned_branches`) is the guaranteed backstop for any fork the + write path never revisits. Both degrade to a no-op if Lance ships an atomic + multi-dataset branch op. - **Local `write_text_if_match` is not a cross-process CAS:** object-store backends use a true conditional put (ETag If-Match; the in-memory test backend too), but upstream `object_store` leaves `PutMode::Update` diff --git a/docs/dev/writes.md b/docs/dev/writes.md index c3511e07..cd2c701f 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -19,8 +19,14 @@ publisher's row-level CAS on `__manifest` is the single fence. `__run__*` branch on an upgraded graph is swept off `__manifest` by the v2→v3 internal-schema migration on first read-write open. (The inert `_graph_runs.lance` bytes remain until a `delete_prefix` primitive lands.) -- Cancelled mutation futures leave **no graph-level state** — only orphaned - Lance fragments, which the existing `omnigraph cleanup` pipe reclaims. +- Cancelled mutation futures leave **no graph-visible state** — the manifest + is never advanced. They can leave two kinds of unreferenced residue, both + self-healing: orphaned Lance fragments (reclaimed by `omnigraph cleanup`), + and — on the *first* write to a table on a branch, which forks it before the + publish — a manifest-unreferenced branch ref. The next write to that table + reclaims the stale fork and re-forks (`reclaim_orphaned_fork_and_refork`), + and `cleanup`'s per-table reconciler is the guaranteed backstop; see the + fork-reclaim note in [invariants.md](invariants.md). ## Read-your-writes within a multi-statement mutation From 37f5af587afecc9607b9bead6e9bce30805852ec Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 19:11:47 +0200 Subject: [PATCH 4/9] fix(cleanup): reconcile per-table manifest-unreferenced forks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reconcile_orphaned_branches keyed orphans on the branch NAME (absent from the manifest), so it only reclaimed forks from a fully-deleted branch. A fork left on a still-live branch by an interrupted first-write was never reclaimed — the backstop the handoff expected cleanup to provide did not cover that case. Broaden it to a per-table authority test: a Lance branch B on table T is an orphan iff B is not a live manifest branch (delete-leftover) OR the manifest's branch-B snapshot does not place T on B (interrupted first-write). Per-branch snapshots are resolved once and cached across tables. Legitimately-forked tables, main, and internal/system branches are never reclaimed; children are dropped before parents to avoid Lance's referenced-parent RefConflict. The commit-graph half stays whole-branch (per-table doesn't apply there). This is the guaranteed-convergence backstop to the write-path self-heal: it reclaims any fork the write path never revisits, and is what Lance's own create_branch docstring asks embedders to provide for zombie/orphan refs. --- crates/omnigraph/src/db/omnigraph/optimize.rs | 100 ++++++++++++++---- crates/omnigraph/tests/maintenance.rs | 70 ++++++++++++ 2 files changed, 149 insertions(+), 21 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 21629a8b..b2a245f3 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -575,27 +575,37 @@ pub struct BranchReconcileStats { pub failures: Vec<(String, String)>, } -/// Drop every per-table and commit-graph Lance branch that the manifest no -/// longer references. +/// Drop every per-table and commit-graph Lance branch fork the manifest does +/// not reference. /// -/// Orphaned forks arise when a `branch_delete` flips the manifest authority -/// (atomic) but a downstream best-effort reclaim does not complete. They are -/// unreachable through any snapshot — no manifest entry can name them — yet -/// they pin their `tree/{branch}/` storage and can block reusing the branch -/// name. This is the guaranteed convergence backstop: it is idempotent and -/// derived purely from the manifest authority, so it no-ops once everything is -/// reconciled, and it would harmlessly find nothing if a future Lance atomic -/// multi-dataset branch op prevented orphans from forming. +/// Two origins produce a manifest-unreferenced fork: +/// 1. A `branch_delete` flips the manifest authority (atomic) but a +/// downstream best-effort reclaim does not complete — the whole branch is +/// gone from the manifest, but a `tree/{branch}/` ref lingers. +/// 2. A first-write fork (or a merge fork) creates the branch ref before the +/// manifest publish, then the writer dies / is cancelled — the branch is +/// still a live manifest branch, but the manifest's snapshot of it does +/// not place *this table* on the branch. /// -/// The keep-set is the full (unfiltered) manifest branch list, so system -/// branches' forks are never reclaimed; `main`/default is not a named Lance -/// branch and so is never a candidate. Referencing children are dropped before -/// parents (Lance refuses to delete a referenced parent) by ordering longest -/// branch names first. +/// The write path self-heals (2) on the next write to the table +/// (`reclaim_orphaned_fork_and_refork`); this is the guaranteed-convergence +/// backstop that also covers (1) and any table the write path never revisits. +/// +/// The orphan test is therefore **per-table**, not per-branch-name: a Lance +/// branch `B` on table `T` is an orphan iff `B` is not a live manifest branch +/// at all (origin 1) OR the manifest's branch-`B` snapshot does not place `T` +/// on `B` (origin 2). A legitimately-forked table (`table_branch == Some(B)`) +/// is kept. `main` and internal/system branches are never candidates. Lance +/// refuses to force-delete a branch with referencing descendants, so children +/// are dropped before parents (longest name first). Idempotent and authority- +/// derived: no-ops once reconciled, and degrades to finding nothing if a future +/// Lance atomic multi-dataset branch op prevents orphans from forming. pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; - let keep: HashSet = db + // Live manifest branches: the set whose per-table placements are + // authoritative. A branch absent here is a whole-branch (origin-1) orphan. + let live_branches: HashSet = db .coordinator .read() .await @@ -616,6 +626,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result = HashMap::new(); // Per-table fault isolation: one table's transient failure is recorded and // logged, never aborting the rest of the sweep. @@ -634,7 +647,52 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result = Vec::new(); + for branch in listed { + // `main` is not a named Lance branch; system/internal branches + // (e.g. the schema-apply lock) own legitimate forks — never touch. + if branch == "main" || crate::db::is_internal_system_branch(&branch) { + continue; + } + let is_orphan = if !live_branches.contains(&branch) { + true // origin 1: whole branch gone from the manifest + } else { + // origin 2: live branch, but does the manifest place THIS + // table on it? Resolve (and cache) the branch's snapshot. + if !branch_snapshots.contains_key(&branch) { + match db.snapshot_for_branch(Some(&branch)).await { + Ok(snap) => { + branch_snapshots.insert(branch.clone(), snap); + } + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + table = %table_key, + branch = %branch, + error = %err, + "resolving branch snapshot failed during reconcile; skipping", + ); + stats.failures.push((table_key.clone(), err.to_string())); + continue; + } + } + } + branch_snapshots[&branch] + .entry(&table_key) + .map(|e| e.table_branch.as_deref() != Some(branch.as_str())) + .unwrap_or(true) + }; + if is_orphan { + orphans.push(branch); + } + } + // Children before parents (longest name first) so Lance's referenced- + // parent RefConflict cannot block reclamation. + orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b))); + + for branch in orphans { let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") { Ok(()) => storage.force_delete_branch(&full_path, &branch).await, Err(injected) => Err(injected), @@ -655,9 +713,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result Date: Sun, 14 Jun 2026 19:49:55 +0200 Subject: [PATCH 5/9] fix: reclaim self-validates against fresh manifest authority The fork reclaim force-deletes a Lance branch ref, gated on the caller's proof that the manifest does not place the table on the branch. But the first-write path obtains that proof via snapshot_for_branch, which returns the coordinator's CACHED snapshot when the handle is bound to the branch (an embedded handle on the branch, or branch_merge's target swap). If that snapshot is stale and a concurrent writer already published a legitimate fork, the reclaim would force-delete it and re-fork from source, stranding the manifest at a version the recreated ref no longer has. Make the destructive primitive own its safety precondition: re-derive it from a FRESH manifest read (fresh_snapshot_for_branch, which bypasses the cache) immediately before force-deleting. If fresh authority shows the table is on the branch, refuse with a retryable conflict instead of destroying a valid fork. Correct for any caller regardless of snapshot staleness. Also stop branching on Lance's exact RefConflict prose (loosened match; typed-variant is the durable follow-up). Addresses PR review (Codex P1, Greptile P2). --- .../omnigraph/src/db/omnigraph/table_ops.rs | 54 ++++++++++++++----- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 9e754148..ded761ed 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -547,15 +547,23 @@ pub(super) async fn fork_dataset_from_entry_state( /// Reclaim a manifest-unreferenced fork and re-fork in its place. /// -/// Reached only when `fork_branch_from_state` reported `RefAlreadyExists` -/// AND the caller already proved (live-manifest authority re-check, under the -/// held per-`(table, active_branch)` write queue) that the manifest does not -/// place this table on `active_branch`. So the leftover ref is reclaimable -/// derived state: drop it (idempotent `force_delete_branch`) and re-fork, -/// exactly once. A second collision means a concurrent *foreign-process* -/// writer recreated the ref (in-process is impossible while we hold the -/// queue) — the documented one-winner-CAS gap — so surface a retryable -/// conflict; on retry the winner's fork is visible and the no-fork path runs. +/// Reached when `fork_branch_from_state` reports `RefAlreadyExists`. This is a +/// destructive op (it force-deletes a Lance branch ref), so it owns its own +/// safety precondition rather than trusting the caller's: it re-derives, from +/// a FRESH manifest read, that the manifest does not place this table on +/// `active_branch`. The caller's earlier proof may have come from the +/// coordinator's *cached* branch snapshot (`resolved_branch_target` returns +/// the cache when the handle is bound to `active_branch` — an embedded handle +/// on the branch, or `branch_merge`'s target swap); trusting it could +/// force-delete a fork a concurrent writer just legitimately published. Only +/// once fresh authority confirms the ref is unreferenced does it drop the ref +/// (idempotent `force_delete_branch`) and re-fork, exactly once. +/// +/// If fresh authority shows the table IS on `active_branch` (a legitimate +/// concurrent fork), or a second collision occurs after reclaim (a foreign- +/// process writer recreated the ref — the documented one-winner-CAS gap), it +/// surfaces a retryable conflict; on retry the winner's fork is visible and +/// the no-fork path runs. pub(super) async fn reclaim_orphaned_fork_and_refork( db: &Omnigraph, table_key: &str, @@ -564,16 +572,34 @@ pub(super) async fn reclaim_orphaned_fork_and_refork( source_version: u64, active_branch: &str, ) -> Result { + // Self-validate against FRESH authority before destroying anything. + // `fresh_snapshot_for_branch` bypasses the coordinator cache. If a + // legitimate fork now exists, this ref is not an orphan — refuse (retryable) + // rather than strand the manifest at a version the recreated ref won't have. + let fresh = db.fresh_snapshot_for_branch(Some(active_branch)).await?; + if let Some(entry) = fresh.entry(table_key) { + if entry.table_branch.as_deref() == Some(active_branch) { + return Err(OmniError::manifest_expected_version_mismatch( + table_key, + source_version, + entry.table_version, + )); + } + } + crate::failpoints::maybe_fail("fork.before_reclaim")?; db.storage() .force_delete_branch(full_path, active_branch) .await .map_err(|e| { // Lance refuses to delete a branch with dependent child branches - // even under force (`refs.rs` RefConflict "referenced by N - // versions"). Unreachable for a leaf first-write fork, but surface - // it actionably instead of looping. - if e.to_string().contains("referenced by") { + // even under force (RefConflict). Unreachable for a leaf first-write + // fork (the cleanup reconciler also drops children before parents), + // but surface it actionably if it ever happens. We match loosely on + // "referenc" rather than the exact prose, which is not a Lance API + // contract; a typed RefConflict variant through `force_delete_branch` + // is the durable follow-up. + if e.to_string().contains("referenc") { OmniError::manifest_conflict(format!( "branch '{active_branch}' cannot reclaim the leftover fork for \ table '{table_key}' because it has dependent child branches; \ @@ -596,7 +622,7 @@ pub(super) async fn reclaim_orphaned_fork_and_refork( { crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds), crate::storage_layer::ForkOutcome::RefAlreadyExists => { - let live = db.snapshot_for_branch(Some(active_branch)).await?; + let live = db.fresh_snapshot_for_branch(Some(active_branch)).await?; let actual = live .entry(table_key) .map(|e| e.table_version) From d2bccd90adbaa08a7db4acf586f62cd7c63f7227 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 19:50:05 +0200 Subject: [PATCH 6/9] fix: cover delete-cascade edges in up-front fork-queue acquisition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A node delete cascades to every edge table touching that node (execute_delete_ node), forking those edge tables during execution. But touched_table_keys derived the up-front fork-queue set from the IR ops alone (just node:Type), so a branch delete that forks node + cascade edges held only the node queue — commit_all then saw cascade-edge keys it had no guard for. The touched set is a pure function of (IR ops + catalog), so compute the COMPLETE set: op types plus, for delete-node ops, the cascade edges derived the same way the executor derives them (from_type/to_type match). Pre-computed now equals actual by construction. Also promote commit_all's held-guard coverage check out of debug_assert into an all-builds check that fails the write with a typed manifest_internal error: a load-bearing serialization invariant must fail loudly+safely in release, not silently proceed unguarded if a future execution path ever touches a table outside the pre-computed set. Adds branch_cascade_delete_forks_node_and_edges_under_held_queues, which drives the cascade path on a branch (the gap the existing insert/load tests missed). Addresses PR review (Cursor medium, Greptile P2). --- crates/omnigraph/src/exec/mutation.rs | 60 ++++++++++++++++----------- crates/omnigraph/src/exec/staging.rs | 25 +++++++---- crates/omnigraph/tests/writes.rs | 47 +++++++++++++++++++++ 3 files changed, 99 insertions(+), 33 deletions(-) diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 8fed05db..9fcff450 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -881,34 +881,46 @@ impl Omnigraph { Ok(ir) } - /// The set of `(node|edge):{type}` table keys a mutation IR touches, as - /// they will be keyed in `MutationStaging`/`commit_all`. Mirrors the - /// node-vs-edge dispatch the execute paths use (`node_types` first, then - /// `edge_types`), so the keys match what `commit_all` recomputes. - /// Unknown types are skipped (the execute path surfaces the error); - /// they must not be locked. Sorted + deduped for one-shot - /// `acquire_many`. Used by the up-front fork-queue acquisition. + /// The COMPLETE set of `(node|edge):{type}` table keys a mutation IR can + /// touch at execution time, keyed as `MutationStaging`/`commit_all` key + /// them. Must be a superset of everything execution forks/commits, since + /// it drives the up-front fork-queue acquisition and `commit_all`'s + /// held-guard coverage check — a miss means an unserialized fork/commit. + /// + /// The set is a pure function of (IR ops + catalog). For each op it mirrors + /// the execute path's node-vs-edge dispatch (`node_types` first, then + /// `edge_types`). A `delete ` additionally **cascades** to every edge + /// type whose endpoint is that node (see `execute_delete_node`), forking + /// those edge tables during execution — so they are included here, derived + /// the same way the executor derives them (`from_type`/`to_type` match). + /// Unknown types are skipped (the execute path surfaces the error). + /// Sorted + deduped for one-shot `acquire_many`. fn touched_table_keys(&self, ir: &omnigraph_compiler::ir::MutationIR) -> Vec { use omnigraph_compiler::ir::MutationOpIR; let catalog = self.catalog(); - let mut keys: Vec = ir - .ops - .iter() - .filter_map(|op| { - let type_name = match op { - MutationOpIR::Insert { type_name, .. } - | MutationOpIR::Update { type_name, .. } - | MutationOpIR::Delete { type_name, .. } => type_name, - }; - if catalog.node_types.contains_key(type_name) { - Some(format!("node:{type_name}")) - } else if catalog.edge_types.contains_key(type_name) { - Some(format!("edge:{type_name}")) - } else { - None + let mut keys: Vec = Vec::new(); + for op in &ir.ops { + let type_name = match op { + MutationOpIR::Insert { type_name, .. } + | MutationOpIR::Update { type_name, .. } + | MutationOpIR::Delete { type_name, .. } => type_name, + }; + if catalog.node_types.contains_key(type_name) { + keys.push(format!("node:{type_name}")); + // A node delete cascades to every edge touching this node type, + // forking those edge tables. Include them so the up-front + // acquisition covers the cascade (mirrors execute_delete_node). + if matches!(op, MutationOpIR::Delete { .. }) { + for (edge_name, edge_type) in &catalog.edge_types { + if edge_type.from_type == *type_name || edge_type.to_type == *type_name { + keys.push(format!("edge:{edge_name}")); + } + } } - }) - .collect(); + } else if catalog.edge_types.contains_key(type_name) { + keys.push(format!("edge:{type_name}")); + } + } keys.sort(); keys.dedup(); keys diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index f84b9af2..464ec342 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -527,17 +527,24 @@ impl StagedMutation { } // Reuse the caller's guards (fork path) when handed in, else acquire // our own. When reusing, every key we would acquire MUST already be - // covered — re-acquiring a held non-re-entrant key would deadlock. + // covered — re-acquiring a held non-re-entrant key would deadlock, and + // a key we'd need but DON'T hold would commit unserialized. This is a + // load-bearing safety invariant, so it is checked in ALL builds (not a + // debug_assert) and fails the write loudly+safely rather than silently + // proceeding unguarded if a future execution path ever touches a table + // outside the caller's pre-computed set. let guards = match held_guards { Some((acquired_keys, guards)) => { - debug_assert!( - { - let held: std::collections::HashSet<&(String, Option)> = - acquired_keys.iter().collect(); - queue_keys.iter().all(|k| held.contains(k)) - }, - "commit_all: held_guards must cover every touched-table queue key" - ); + let held: std::collections::HashSet<&(String, Option)> = + acquired_keys.iter().collect(); + if let Some(missing) = queue_keys.iter().find(|k| !held.contains(k)) { + return Err(OmniError::manifest_internal(format!( + "commit_all: pre-held write-queue guards do not cover touched table \ + '{}' on branch {:?} — the caller's up-front acquisition set diverged \ + from the staged/inline set (a touched-table-set bug)", + missing.0, missing.1 + ))); + } guards } None => db.write_queue().acquire_many(&queue_keys).await, diff --git a/crates/omnigraph/tests/writes.rs b/crates/omnigraph/tests/writes.rs index 6a41cd1b..81209407 100644 --- a/crates/omnigraph/tests/writes.rs +++ b/crates/omnigraph/tests/writes.rs @@ -1599,3 +1599,50 @@ async fn first_write_self_heals_manifest_unreferenced_fork_on_live_branch() { (feature={feature_people}, main={main_people})" ); } + +// A node delete cascades to every edge table touching that node, forking those +// edge tables during execution. The up-front fork-queue acquisition must cover +// those cascade-forked edges, not just the node table named in the IR — else +// commit_all's held-guard coverage check fails the write (and, before the +// coverage check was promoted out of debug-only, edge commits would slip +// through unserialized). This drives the new code via a DELETE (the only +// cascading op), on a branch, as the FIRST write (so it actually forks). +#[tokio::test] +async fn branch_cascade_delete_forks_node_and_edges_under_held_queues() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + db.branch_create("feature").await.unwrap(); + + // Baseline inherited from main (Alice has 2 Knows + 1 WorksAt edge). + let main_people = count_rows(&db, "node:Person").await; + let main_knows = count_rows(&db, "edge:Knows").await; + + // First write to `feature` is `delete Person Alice`, whose cascade forks + // node:Person AND edge:Knows + edge:WorksAt. Pre-fix the up-front set held + // only node:Person, so commit_all's coverage check rejected the write. + mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "Alice")], &[]), + ) + .await + .expect("branch cascade-delete must hold queues for cascade-forked edge tables"); + + // Alice and her edges are gone on feature; main is untouched. + assert_eq!( + count_rows_branch(&db, "feature", "node:Person").await, + main_people - 1, + "feature should have Alice removed from the inherited set" + ); + assert!( + count_rows_branch(&db, "feature", "edge:Knows").await < main_knows, + "feature should have Alice's cascade-deleted Knows edges removed" + ); + assert_eq!( + count_rows(&db, "node:Person").await, + main_people, + "main must be untouched by the branch delete" + ); +} From 509a9c28c665b7c9e53a14e98cc7f6f877949a0a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 20:33:48 +0200 Subject: [PATCH 7/9] fix(cleanup): serialize fork reclaim against in-process live writers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The broadened per-table reconciler force_delete'd an orphan candidate on a LIVE branch without holding the per-(table, branch) write queue. An in-process first-write fork in its fork->publish window holds that queue and has not yet advanced the manifest, so it looks exactly like an origin-2 orphan — concurrent cleanup could delete the ref the writer still holds and is about to publish. (The old branch-name-based reconciler did not have this race: a deleted branch cannot have a live first-write.) Bring the reconciler under the same invariant the write-path reclaim already obeys: never force_delete a fork ref without holding the (table, branch) write queue AND confirming, under it, from a fresh read, that the ref is still manifest-unreferenced. Acquire one key at a time (no lock-order inversion vs multi-table acquire_many writers); if the writer published meanwhile, the fresh re-check sees the table on the branch and skips. Cross-process writers remain the documented one-winner-CAS gap. Addresses PR review (Cursor high). --- crates/omnigraph/src/db/omnigraph/optimize.rs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index fa882d0a..bd44a091 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -717,6 +717,31 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result snap + .entry(&table_key) + .map(|e| e.table_branch.as_deref() != Some(branch.as_str())) + .unwrap_or(true), + // Branch absent from the manifest entirely (origin 1) → orphan. + Err(_) => true, + }; + if !still_orphan { + continue; + } let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") { Ok(()) => storage.force_delete_branch(&full_path, &branch).await, Err(injected) => Err(injected), From 330010937a3ae04ded0c1e15f32157cce463a032 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 20:33:57 +0200 Subject: [PATCH 8/9] fix: classify create_branch failure by ref existence, not by failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fork_branch_from_state mapped ANY create_branch failure to RefAlreadyExists, routing transient I/O / version / Lance-internal errors into the destructive reclaim path and masking the real error as a retryable conflict. Branch on the actual fact instead: on create_branch failure, check whether the ref exists (list_branches). Only a genuinely pre-existing ref — a fully-formed manifest-unreferenced fork — is a reclaim candidate; any other failure propagates with fidelity. We deliberately do NOT force-delete on a not-found-ref failure: it is indistinguishable from a transient error on a fresh create, and force-deleting there is the overreach the fresh-authority guard already removed. A phase-1-only Lance zombie (rarer; create_branch interrupted mid its two internal phases) surfaces as the propagated error for manual reclaim. Addresses PR review (Cursor medium). --- crates/omnigraph/src/table_store.rs | 46 +++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 99c8f1b2..c458af6b 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -295,22 +295,42 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string()))?; self.ensure_expected_version(&source_ds, table_key, source_version)?; - if source_ds + if let Err(create_err) = source_ds .create_branch(target_branch, source_version, None) .await - .is_err() { - // A branch ref for `target_branch` already exists on this table (a - // fully-formed manifest-unreferenced fork, or a phase-1-only Lance - // "zombie" — `create_branch` fails for both). The caller - // (`open_owned_dataset_for_branch_write`) has already re-read the - // live manifest under the held write queue and confirmed it does - // NOT place this table on `target_branch`, so the ref is reclaimable - // derived state, not authoritative. Return the typed signal and let - // the db layer reclaim + re-fork rather than guessing or hard-failing - // here. (A live committed fork is routed to a retryable conflict - // upstream, before we ever reach this fork path.) - return Ok(ForkOutcome::RefAlreadyExists); + // Disambiguate the failure: only a genuinely pre-existing ref is a + // reclaim candidate. Mapping EVERY create_branch failure to + // `RefAlreadyExists` would route a transient I/O / version / Lance + // internal error into the destructive reclaim path. So check whether + // the ref actually exists; if not, the failure is real — propagate + // it (preserving error fidelity) rather than force-deleting. + // + // `list_branches` reads `_refs/branches/` from the store, so it sees + // a fully-formed manifest-unreferenced fork (our common case — a + // create_branch that completed but whose manifest publish did not). + // It does NOT see a phase-1-only Lance "zombie" (tree dir written, + // no BranchContents) — but neither does `cleanup`'s reconciler, also + // list_branches-based. A zombie only forms if create_branch is + // interrupted *between its two internal phases* (a far narrower + // window than the manifest-publish gap), and it surfaces here as the + // propagated create error requiring manual reclaim. We deliberately + // do NOT force-delete on a not-found-ref failure: it is + // indistinguishable from a transient error on a fresh create, and + // force-deleting there is the destructive overreach this guard + // removes. The caller holds the per-(table, branch) write queue, so + // no in-process writer races this fork; a cross-process create + // between our check and now is the documented one-winner-CAS gap and + // propagates as a retryable error. + let ref_exists = source_ds + .list_branches() + .await + .map(|b| b.contains_key(target_branch)) + .unwrap_or(false); + if ref_exists { + return Ok(ForkOutcome::RefAlreadyExists); + } + return Err(OmniError::Lance(create_err.to_string())); } let ds = self From c2f492d4c0701e479bed1f85000cb7f3508393b5 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 14 Jun 2026 20:50:23 +0200 Subject: [PATCH 9/9] fix(cleanup): skip (not delete) on a transient re-check error for a live branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reconcile pre-delete re-check treated ANY fresh_snapshot error as 'still an orphan' and proceeded to force_delete. A transient manifest read failure on a LIVE branch could therefore destroy a fork the manifest still considers legitimate — inconsistent with the write-path reclaim (aborts on the same error) and the candidate scan (skips on snapshot failure). Distinguish the two origins under the queue: a branch absent from the manifest authority (origin 1) is a confirmed orphan and is deleted without a fresh read (no live writer can hold a deleted branch's queue); a LIVE branch (origin 2) gets the fresh re-check and, on a transient read error, is SKIPPED — never destroyed on ambiguity — converging on a later cleanup. Same don't-destroy-on- ambiguous-error principle as the create_branch failure classification. Addresses PR review (Cursor medium). --- crates/omnigraph/src/db/omnigraph/optimize.rs | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index bd44a091..e5102955 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -731,13 +731,39 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result snap - .entry(&table_key) - .map(|e| e.table_branch.as_deref() != Some(branch.as_str())) - .unwrap_or(true), - // Branch absent from the manifest entirely (origin 1) → orphan. - Err(_) => true, + let still_orphan = if !live_branches.contains(&branch) { + // Origin 1: the branch is absent from the manifest authority + // entirely — a confirmed orphan. No live writer can hold this + // branch's queue (you cannot first-write to a branch the + // manifest does not have), so no fresh re-check is needed. + true + } else { + // Origin 2: a LIVE branch whose manifest snapshot does not (yet) + // place this table on it. A fresh read tells us whether an + // in-process writer published a legitimate fork while we waited + // on the queue. On a TRANSIENT read failure we must NOT destroy + // the ref — skip and let a later cleanup converge, matching the + // write-path reclaim (which aborts on the same error). Treating + // a read error as "still orphan" here would let a transient + // manifest hiccup delete a fork the manifest considers live. + match db.fresh_snapshot_for_branch(Some(&branch)).await { + Ok(snap) => snap + .entry(&table_key) + .map(|e| e.table_branch.as_deref() != Some(branch.as_str())) + .unwrap_or(true), + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + table = %table_key, + branch = %branch, + error = %err, + "fresh re-check failed during reconcile; skipping to avoid \ + destroying a possibly-live fork (will retry next cleanup)", + ); + stats.failures.push((table_key.clone(), err.to_string())); + false + } + } }; if !still_orphan { continue;