Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Compaction (`compact_files`) + reindex (`optimize_indices`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; per table runs `compact_files` **then Lance `optimize_indices`** (folds appended/rewritten fragments back into existing indexes — incremental merge, not retrain) and **publishes the resulting version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe the work and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage spanning both ops; **commits even with no compaction work if index coverage is stale**; **refuses on an unrecovered graph**; **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair`; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent; reindex is skipped for them too today), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them per `@index`/`@key` column, dispatched by type via `node_prop_index_kind` (enum + orderable scalar → BTREE, free-text String → FTS, Vector → vector); idempotent; lazy across branches. Coverage of fragments appended after build is restored by `optimize`'s `optimize_indices` pass (see Compaction row). |
| `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering |
| Vector search | ✅ | `nearest()` query op; embedding pipeline (Gemini / OpenAI clients); `@embed` in schema |
| Full-text search | ✅ | `search/fuzzy/match_text/bm25` query ops |
Expand Down
2 changes: 1 addition & 1 deletion crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use lance::dataset::scanner::ColumnOrdering;
use lance::datatypes::BlobKind;
use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::types::ScalarType;
use omnigraph_compiler::types::{PropType, ScalarType};
use omnigraph_compiler::{
DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
build_catalog_from_ir, build_schema_ir, plan_schema_migration,
Expand Down
46 changes: 35 additions & 11 deletions crates/omnigraph/src/db/omnigraph/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{
CompactionMetrics, CompactionOptions, compact_files, plan_compaction,
};
use lance::index::DatasetIndexExt;
use lance_index::optimize::OptimizeOptions;

use super::*;

Expand Down Expand Up @@ -361,25 +363,32 @@ async fn optimize_one_table(
}

// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment). A
// steady-state already-compacted table yields an empty plan and is never
// pinned in a sidecar (a zero-commit pin would classify NoMovement on
// recovery and force an all-or-nothing rollback). Uncovered pre-existing
// drift is skipped above and must go through explicit repair.
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if plan.num_tasks() == 0 {
let will_compact = plan.num_tasks() > 0;
// Even when there is nothing to compact, the table may still have index
// work: rows appended since the index was built (e.g. via `ingest --mode
// merge`) are scanned unindexed until folded in. Either compaction or stale
// index coverage is enough to enter the publish path. If NEITHER, this
// table is a no-op and must NOT be pinned in a sidecar — a zero-commit pin
// classifies NoMovement on recovery and forces an all-or-nothing rollback
// of sibling tables' legitimate work. Uncovered pre-existing manifest/HEAD
// drift is skipped above and must go through explicit repair.
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
if !will_compact && !needs_reindex {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
false,
));
}

// Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a
// crash before the manifest publish rolls forward on next open.
// Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or
// index optimize), so a crash before the manifest publish rolls forward on
// next open.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
Expand All @@ -398,11 +407,26 @@ async fn optimize_one_table(
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;

// Phase B: compaction (reserve-fragments + rewrite commits advance HEAD).
// Phase B: compaction (if any) then incremental index optimize — both
// advance Lance HEAD inside the sidecar window. `compact_files` rewrites
// fragments and drops them from existing index segments' coverage;
// `optimize_indices` folds the rewritten and any previously-unindexed
// fragments back in (Lance's incremental merge, not a full retrain). This
// is the same compact -> optimize_indices sequencing LanceDB's `optimize()`
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
// exposes no uncommitted variant, so like `compact_files` it commits
// directly and relies on the sidecar for recovery.
let version_before = ds.version().version;
let metrics: CompactionMetrics = compact_files(&mut ds, options, None)
let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
} else {
CompactionMetrics::default()
};
ds.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
let version_after = ds.version().version;
let committed = version_after != version_before;
Comment on lines +427 to 431

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 optimize_indices called unconditionally even when needs_reindex = false

When will_compact = true but needs_reindex = false, the function enters the sidecar path, compaction runs and rewrites fragments (dropping them from existing index coverage), and then optimize_indices folds them back in — this sequencing is correct. However, if the table has no non-system indexes at all, optimize_indices still makes a Lance API call (a no-op) on every compaction cycle. A guard checking !ds.load_indices().await?.iter().any(|i| !is_system_index(i)) before calling optimize_indices would avoid the spurious round-trip, though this is a latency concern only.

Fix in Claude Code

Comment on lines 380 to 431

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Sidecar written without guaranteed HEAD advance on the reindex-only path

The original code gated sidecar creation on plan.num_tasks() > 0, guaranteeing compact_files advances Lance HEAD at least once — so a stranded sidecar (Phase D cleanup failure) always recovers forward, not as NoMovement. The new needs_reindex entry point doesn't carry that guarantee: if a concurrent optimize already ran optimize_indices between the has_unindexed_fragments check and the call here, optimize_indices commits nothing, committed = false, Phase C is skipped, and the sidecar is left on disk if Phase D cleanup fails.

Recovery classifies this as "not yet committed" (dataset at V < post_commit_pin V+1) and takes the rollback path (Dataset::restore(V) + manifest publish V). Since V is the current version the restore is a data no-op, but the manifest CAS can conflict if concurrent writes advanced the table in the window — requiring manual omnigraph repair.

Fix in Claude Code


Expand Down
132 changes: 99 additions & 33 deletions crates/omnigraph/src/db/omnigraph/table_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,48 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
Ok(())
}

/// The single scalar/vector index a node property receives from a one-column
/// `@index`/`@key` declaration, or `None` when the property type is not
/// indexable here (a list column or `Blob`).
///
/// Shared by `build_indices_on_dataset_for_catalog` (which builds the index)
/// and `needs_index_work_node` (which checks coverage to decide recovery-
/// sidecar pinning) so the two cannot drift: an enum or orderable scalar the
/// builder gives a BTREE must also be reported as "needs work" until that
/// BTREE exists, or the HEAD-advancing build would run without sidecar cover.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum NodePropIndexKind {
Btree,
Fts,
Vector,
}

fn node_prop_index_kind(prop_type: &PropType) -> Option<NodePropIndexKind> {
if prop_type.list {
return None;
}
// Enums are physically `String` but filtered by equality, so they take a
// scalar BTREE, not an FTS inverted index (Lance never consults an inverted
// index for `=`/range). Free-text Strings keep FTS for
// `search()`/`match_text`/`bm25`.
let is_enum = prop_type.enum_values.is_some();
match prop_type.scalar {
ScalarType::String if !is_enum => Some(NodePropIndexKind::Fts),
ScalarType::Vector(_) => Some(NodePropIndexKind::Vector),
ScalarType::String
| ScalarType::DateTime
| ScalarType::Date
| ScalarType::I32
| ScalarType::I64
| ScalarType::U32
| ScalarType::U64
| ScalarType::F32
| ScalarType::F64
| ScalarType::Bool => Some(NodePropIndexKind::Btree),
ScalarType::Blob => None,
}
}

/// Returns true if the node table is missing at least one declared
/// scalar/vector index that `build_indices_on_dataset_for_catalog` would
/// build AND has at least one row (the ensure_indices loop has
Expand All @@ -318,11 +360,12 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
/// would force `NoMovement` classification on recovery and trigger the
/// all-or-nothing rollback of sibling tables' legitimate index work).
///
/// Per the actual `build_indices_on_dataset_for_catalog` implementation
/// (this file, ~line 419-491), nodes get BTree (id) + per-prop FTS
/// (@search String) + per-prop Vector indices; edges get BTree only
/// (id, src, dst). The two helpers mirror that asymmetry — see the
/// `needs_index_work_edge` doc comment.
/// Per `build_indices_on_dataset_for_catalog`, nodes get BTree (id) plus, for
/// each one-column `@index`/`@key` property, the index `node_prop_index_kind`
/// assigns: a scalar BTREE for enums and orderable scalars
/// (DateTime/Date/numeric/Bool), FTS for free-text Strings, or a Vector index.
/// Edges get BTree only (id, src, dst). This helper and the builder share
/// `node_prop_index_kind` so they cannot drift — see its doc comment.
async fn needs_index_work_node(
db: &Omnigraph,
type_name: &str,
Expand Down Expand Up @@ -359,14 +402,23 @@ async fn needs_index_work_node(
let Some(prop_type) = node_type.properties.get(prop_name) else {
continue;
};
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.storage().has_fts_index(&ds, prop_name).await? {
return Ok(true);
match node_prop_index_kind(prop_type) {
Some(NodePropIndexKind::Fts) => {
if !db.storage().has_fts_index(&ds, prop_name).await? {
return Ok(true);
}
}
Some(NodePropIndexKind::Vector) => {
if !db.storage().has_vector_index(&ds, prop_name).await? {
return Ok(true);
}
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.storage().has_vector_index(&ds, prop_name).await? {
return Ok(true);
Some(NodePropIndexKind::Btree) => {
if !db.storage().has_btree_index(&ds, prop_name).await? {
return Ok(true);
}
}
None => {}
}
}
Ok(false)
Expand Down Expand Up @@ -615,30 +667,44 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
}
let prop_name = &index_cols[0];
if let Some(prop_type) = node_type.properties.get(prop_name) {
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.storage().has_fts_index(ds, prop_name).await? {
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
.await?;
match node_prop_index_kind(prop_type) {
Some(NodePropIndexKind::Fts) => {
if !db.storage().has_fts_index(ds, prop_name).await? {
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
.await?;
}
}
Some(NodePropIndexKind::Vector) => {
if !db.storage().has_vector_index(ds, prop_name).await? {
// Inline-commit residual: lance-6.0.1 does not
// expose `build_index_metadata_from_segments` as
// `pub`, so vector indices cannot be staged from
// outside the lance crate. Document at the call
// site; companion ticket to lance-format/lance#6658.
let new_snap = db
.storage_inline_residual()
.create_vector_index(ds.clone(), prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
"create Vector index on {}({}): {}",
table_key, prop_name, e
))
})?;
*ds = new_snap;
}
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.storage().has_vector_index(ds, prop_name).await? {
// Inline-commit residual: lance-6.0.1 does not
// expose `build_index_metadata_from_segments` as
// `pub`, so vector indices cannot be staged from
// outside the lance crate. Document at the call
// site; companion ticket to lance-format/lance#6658.
let new_snap = db
.storage_inline_residual()
.create_vector_index(ds.clone(), prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
"create Vector index on {}({}): {}",
table_key, prop_name, e
))
})?;
*ds = new_snap;
// Enum + orderable scalars (DateTime/Date/numeric/Bool)
// get a BTREE so `=`, range, IN, and IS NULL are index-
// accelerated instead of degrading to a full scan.
Some(NodePropIndexKind::Btree) => {
if !db.storage().has_btree_index(ds, prop_name).await? {
stage_and_commit_btree(db, table_key, ds, &[prop_name.as_str()])
.await?;
}
}
// List or Blob column: not indexable as a scalar here.
None => {}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/omnigraph/src/exec/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ fn evaluate_expr(batch: &RecordBatch, expr: &IRExpr, params: &ParamMap) -> Resul
}

/// Create a constant array from a literal value.
fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
///
/// `pub(super)` so the pushdown arm (`query.rs::literal_to_typed_expr`) can build
/// a literal in the same natural Arrow type and cast it to the column type through
/// the identical `arrow_cast` path used here, keeping the two filter arms in sync.
pub(super) fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
Ok(match lit {
Literal::Null => arrow_array::new_null_array(&DataType::Utf8, num_rows),
Literal::String(s) => Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef,
Expand Down
Loading
Loading