Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions crates/omnigraph-server/examples/bench_concurrent_http.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline).
//!
//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP
//! server. Measures the global `Arc<RwLock<Omnigraph>>` lock penalty on
//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline.
//! server. Originally written to measure the global `Arc<RwLock<Omnigraph>>`
//! lock penalty as an MR-686 baseline; that lock has since been removed
//! (engine write APIs are `&self`, the server holds a lockless
//! `Arc<Omnigraph>`), so this now measures the concurrent write path itself
//! (per-`(table, branch)` queue contention + Lance I/O).
//!
//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as`
//! is `&mut self`, so an engine-level concurrent bench either serializes on the
//! borrow checker (measures nothing) or drives multiple handles (measures Lance
//! contention, not the server bottleneck). Driving the HTTP server is the only
//! way to measure the actual `RwLock<Omnigraph>` contention this work removes.
//! Driving the HTTP server is still the right level: an engine-level bench on
//! a single handle measures Lance contention, not the server's request-path
//! concurrency.
//!
//! Usage:
//! ```sh
Expand Down
8 changes: 4 additions & 4 deletions crates/omnigraph/src/db/manifest/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,10 @@ pub(crate) fn schema_apply_serial_queue_key() -> crate::db::write_queue::TableQu
/// same table append extra Lance restore commits which `omnigraph
/// cleanup` reclaims.
///
/// Concurrency: today recovery runs synchronously in `Omnigraph::open`
/// *before* the engine is wrapped in the server's `Arc<RwLock<Omnigraph>>`.
/// No request handlers can race, so this sweep does NOT acquire write
/// queues. In-process callers (refresh, write entry points) must use
/// Concurrency: the open-time sweep runs synchronously in `Omnigraph::open`
/// before the engine handle is published to any caller, so no request
/// handler can race it and it does NOT acquire write queues. In-process
/// callers (refresh, write entry points) must use
/// [`heal_pending_sidecars_roll_forward`] instead, which serializes
/// against live writers via per-(table_key, branch) queue acquisition.
pub(crate) async fn recover_manifest_drift(
Expand Down
34 changes: 28 additions & 6 deletions crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ pub struct Omnigraph {
/// Read-heavy on schema introspection paths, written only by
/// `apply_schema`. Same ArcSwap rationale as `catalog`.
schema_source: Arc<ArcSwap<String>>,
/// Per-`(table_key, branch)` writer queues. Reachable from engine
/// internals (mutation finalize, schema_apply, branch_merge,
/// ensure_indices, delete_where) and from future MR-870 recovery
/// reconciler. PR 1b adds the field; callers acquire in commits 4+.
/// Per-`(table_key, branch)` writer queues — the engine's
/// write-serialization mechanism (the server holds the engine as a
/// lockless `Arc<Omnigraph>`). Reachable from engine internals
/// (mutation finalize, schema_apply, branch_merge, ensure_indices,
/// delete_where, the fork path, recovery reconciler).
write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
/// Process-wide mutex held across the swap → operate → restore window
/// in `branch_merge_impl`. Two concurrent merges with distinct targets
Expand Down Expand Up @@ -1479,6 +1480,13 @@ impl Omnigraph {
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
}

/// Fork `table_key` onto `active_branch` from the given source state,
/// self-healing a manifest-unreferenced leftover fork if one is in the
/// way. Callers that reach this MUST already hold the per-`(table_key,
/// active_branch)` write queue (so the reclaim cannot race an in-process
/// fork) and must have confirmed via the live manifest that the table is
/// not yet on `active_branch`. Both the first-write fork path
/// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this.
pub(crate) async fn fork_dataset_from_entry_state(
&self,
table_key: &str,
Expand All @@ -1487,15 +1495,29 @@ impl Omnigraph {
source_version: u64,
active_branch: &str,
) -> Result<SnapshotHandle> {
table_ops::fork_dataset_from_entry_state(
match table_ops::fork_dataset_from_entry_state(
self,
table_key,
full_path,
source_branch,
source_version,
active_branch,
)
.await
.await?
{
crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
crate::storage_layer::ForkOutcome::RefAlreadyExists => {
table_ops::reclaim_orphaned_fork_and_refork(
Comment on lines +1509 to +1510

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Re-read branch authority before reclaiming forks

This reclaim trusts the caller's proof that the manifest does not place the table on active_branch, but the first-write path obtains that proof with snapshot_for_branch, which can return the coordinator's cached snapshot when the handle is currently bound to that branch. During the branch-merge target-branch swap (or any stale branch-bound handle), another writer may already have published a legitimate fork; create_branch then reports RefAlreadyExists and this path force-deletes/re-forks that valid table branch before commit_all notices the manifest/head mismatch, leaving the manifest pointing at a deleted fork. Re-open a fresh branch snapshot immediately before reclaiming, or treat RefAlreadyExists as retryable unless the fresh manifest proves the ref is unreferenced.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in fedda78. reclaim_orphaned_fork_and_refork now re-derives its precondition from a FRESH manifest read (fresh_snapshot_for_branch, which bypasses the coordinator cache) immediately before force-deleting, and refuses with a retryable conflict if the table is legitimately on the branch. Correct regardless of caller snapshot staleness (including the branch-merge target swap). Good catch.

self,
table_key,
full_path,
source_branch,
source_version,
active_branch,
)
.await
}
}
}

pub(crate) async fn reopen_for_mutation(
Expand Down
151 changes: 130 additions & 21 deletions crates/omnigraph/src/db/omnigraph/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,27 +599,37 @@ pub struct BranchReconcileStats {
pub failures: Vec<(String, String)>,
}

/// Drop every per-table and commit-graph Lance branch that the manifest no
/// longer references.
/// Drop every per-table and commit-graph Lance branch fork the manifest does
/// not reference.
///
/// Orphaned forks arise when a `branch_delete` flips the manifest authority
/// (atomic) but a downstream best-effort reclaim does not complete. They are
/// unreachable through any snapshot — no manifest entry can name themyet
/// they pin their `tree/{branch}/` storage and can block reusing the branch
/// name. This is the guaranteed convergence backstop: it is idempotent and
/// derived purely from the manifest authority, so it no-ops once everything is
/// reconciled, and it would harmlessly find nothing if a future Lance atomic
/// multi-dataset branch op prevented orphans from forming.
/// Two origins produce a manifest-unreferenced fork:
/// 1. A `branch_delete` flips the manifest authority (atomic) but a
/// downstream best-effort reclaim does not completethe whole branch is
/// gone from the manifest, but a `tree/{branch}/` ref lingers.
/// 2. A first-write fork (or a merge fork) creates the branch ref before the
/// manifest publish, then the writer dies / is cancelled — the branch is
/// still a live manifest branch, but the manifest's snapshot of it does
/// not place *this table* on the branch.
///
/// The keep-set is the full (unfiltered) manifest branch list, so system
/// branches' forks are never reclaimed; `main`/default is not a named Lance
/// branch and so is never a candidate. Referencing children are dropped before
/// parents (Lance refuses to delete a referenced parent) by ordering longest
/// branch names first.
/// The write path self-heals (2) on the next write to the table
/// (`reclaim_orphaned_fork_and_refork`); this is the guaranteed-convergence
/// backstop that also covers (1) and any table the write path never revisits.
///
/// The orphan test is therefore **per-table**, not per-branch-name: a Lance
/// branch `B` on table `T` is an orphan iff `B` is not a live manifest branch
/// at all (origin 1) OR the manifest's branch-`B` snapshot does not place `T`
/// on `B` (origin 2). A legitimately-forked table (`table_branch == Some(B)`)
/// is kept. `main` and internal/system branches are never candidates. Lance
/// refuses to force-delete a branch with referencing descendants, so children
/// are dropped before parents (longest name first). Idempotent and authority-
/// derived: no-ops once reconciled, and degrades to finding nothing if a future
/// Lance atomic multi-dataset branch op prevents orphans from forming.
pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconcileStats> {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

let keep: HashSet<String> = db
// Live manifest branches: the set whose per-table placements are
// authoritative. A branch absent here is a whole-branch (origin-1) orphan.
let live_branches: HashSet<String> = db
.coordinator
.read()
.await
Expand All @@ -640,6 +650,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
.collect();

let mut stats = BranchReconcileStats::default();
// Per-branch snapshots are resolved once and cached across tables (few
// branches in practice); origin-2 detection consults the branch's own view.
let mut branch_snapshots: HashMap<String, crate::db::Snapshot> = HashMap::new();

// Per-table fault isolation: one table's transient failure is recorded and
// logged, never aborting the rest of the sweep.
Expand All @@ -658,7 +671,103 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
continue;
}
};
for branch in orphan_branches(listed, &keep) {

// Decide per (table, branch) whether the fork is an orphan.
let mut orphans: Vec<String> = Vec::new();
for branch in listed {
// `main` is not a named Lance branch; system/internal branches
// (e.g. the schema-apply lock) own legitimate forks — never touch.
if branch == "main" || crate::db::is_internal_system_branch(&branch) {
continue;
}
let is_orphan = if !live_branches.contains(&branch) {
true // origin 1: whole branch gone from the manifest
} else {
// origin 2: live branch, but does the manifest place THIS
// table on it? Resolve (and cache) the branch's snapshot.
if !branch_snapshots.contains_key(&branch) {
match db.snapshot_for_branch(Some(&branch)).await {
Ok(snap) => {
branch_snapshots.insert(branch.clone(), snap);
}
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"resolving branch snapshot failed during reconcile; skipping",
);
stats.failures.push((table_key.clone(), err.to_string()));
continue;
}
}
}
branch_snapshots[&branch]
.entry(&table_key)
.map(|e| e.table_branch.as_deref() != Some(branch.as_str()))
.unwrap_or(true)
};
if is_orphan {
orphans.push(branch);
}
}
// Children before parents (longest name first) so Lance's referenced-
// parent RefConflict cannot block reclamation.
orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));

for branch in orphans {
Comment thread
cursor[bot] marked this conversation as resolved.
// Serialize against in-process live writers before destroying a ref.
// A first-write fork holds the per-(table, branch) write queue from
// before the fork through the manifest publish; on a LIVE branch its
// in-flight fork looks exactly like an origin-2 orphan (manifest not
// yet advanced). Acquire the same queue so cleanup waits for any such
// writer, then RE-VALIDATE under the queue with a fresh read: if the
// writer published in the meantime (table now placed on the branch),
// it is no longer an orphan — skip it. (Cross-process writers remain
// the documented one-winner-CAS gap.) One key held at a time → no
// lock-order inversion against multi-table `acquire_many` writers.
let _guard = db
.write_queue()
.acquire(&(table_key.clone(), Some(branch.clone())))
.await;
let still_orphan = if !live_branches.contains(&branch) {
// Origin 1: the branch is absent from the manifest authority
// entirely — a confirmed orphan. No live writer can hold this
// branch's queue (you cannot first-write to a branch the
// manifest does not have), so no fresh re-check is needed.
true
Comment on lines +734 to +739

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Stale live_branches makes origin-1 still_orphan = true unsafe

live_branches is a snapshot taken once at the top of reconcile_orphaned_branches (from the coordinator's cached view). The comment "you cannot first-write to a branch the manifest does not have" is only true at that moment. If a branch is created after live_branches is captured, it is absent from the set even though the manifest now has it and an in-process writer may already be holding the (table, branch) write queue through its fork+publish. The reconciler blocks on the queue, the writer publishes (the fork is now legitimate), the reconciler acquires the queue — and then !live_branches.contains(&branch) is still true (stale), so it treats the legitimately-published fork as an origin-1 orphan and calls force_delete_branch. The result is the manifest referencing a non-existent Lance branch ref, corrupting subsequent reads on that (table, branch) pair until repaired.

The origin-2 path correctly defends against this by calling fresh_snapshot_for_branch under the queue. Origin-1 needs the same treatment: if the fresh read returns "branch not found", it is confirmed absent (still an orphan); if the read succeeds, re-check whether the table is placed on it and skip if so; on transient error, skip (matching the origin-2 safe-skip behaviour).

Fix in Claude Code

} else {
// Origin 2: a LIVE branch whose manifest snapshot does not (yet)
// place this table on it. A fresh read tells us whether an
// in-process writer published a legitimate fork while we waited
// on the queue. On a TRANSIENT read failure we must NOT destroy
// the ref — skip and let a later cleanup converge, matching the
// write-path reclaim (which aborts on the same error). Treating
// a read error as "still orphan" here would let a transient
// manifest hiccup delete a fork the manifest considers live.
match db.fresh_snapshot_for_branch(Some(&branch)).await {
Ok(snap) => snap
.entry(&table_key)
.map(|e| e.table_branch.as_deref() != Some(branch.as_str()))
.unwrap_or(true),
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"fresh re-check failed during reconcile; skipping to avoid \
destroying a possibly-live fork (will retry next cleanup)",
);
stats.failures.push((table_key.clone(), err.to_string()));
false
}
}
};
Comment thread
cursor[bot] marked this conversation as resolved.
if !still_orphan {
continue;
}
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
Expand All @@ -679,9 +788,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
}
}

// Commit-graph orphans (best-effort: the dataset may not exist on a graph
// that has never committed; any failure is isolated and retried next time).
if let Err(err) = reconcile_commit_graph_orphans(db, &keep, &mut stats).await {
// Commit-graph orphans are whole-branch (not per-table), so the simple
// "branch name not in the live set" test still applies there.
if let Err(err) = reconcile_commit_graph_orphans(db, &live_branches, &mut stats).await {
tracing::warn!(
target: "omnigraph::cleanup",
error = %err,
Expand Down
8 changes: 4 additions & 4 deletions crates/omnigraph/src/db/omnigraph/schema_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,10 @@ where
// manifest publish via `commit_changes_with_actor` below.
//
// Schema-apply already holds the graph-wide `__schema_apply_lock__`
// sentinel branch, so under PR 1b's intermediate state these
// per-table acquisitions are uncontended. They exist for symmetry
// with future MR-870 recovery, which will need queue acquisition
// before any `Dataset::restore` it issues for SchemaApply sidecars.
// sentinel branch, so these per-table acquisitions are uncontended in
// practice. They exist for symmetry with the recovery reconciler, which
// acquires the same queues before any `Dataset::restore` it issues for
// SchemaApply sidecars.
let mut schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
.iter()
.map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
Expand Down
Loading
Loading