From b5b14ea82be01004e43824ad8609ade87b8e2690 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 10 Jun 2026 15:54:23 -0500 Subject: [PATCH] feat(mem-wal): snapshot-consistent as-of cut for fresh-tier membership Add `AsOfCut { active_generation, active_batch_count }` and `LsmScanner::contains_pks_as_of` (with `fresh_tier_block_list` threading the cut) so a caller can evaluate fresh-tier PK membership against the exact tier a prior scan observed, instead of the live tier. The active memtable is the only fresh-tier source that grows between two reads; everything strictly below its generation (frozen memtables, flushed generations) is immutable. The cut includes lower generations whole, bounds the active generation to its first `active_batch_count` batches (by append index), and excludes higher generations and flushed generations >= active_generation. This uses only the batch count and generation -- both always available -- and only ever excludes rows the scan did not observe, so a stale cut under-counts (a tolerable stale read) rather than dropping a row. Sophon's WAL block-list uses this to pin its two-phase supersession check to the snapshot the read arm scanned, closing a cross-arm transient missing-row. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/dataset/mem_wal/scanner.rs | 2 +- .../src/dataset/mem_wal/scanner/block_list.rs | 244 +++++++++++++++++- .../src/dataset/mem_wal/scanner/builder.rs | 16 +- .../dataset/mem_wal/scanner/data_source.rs | 28 ++ 4 files changed, 282 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/dataset/mem_wal/scanner.rs b/rust/lance/src/dataset/mem_wal/scanner.rs index b1766f8525f..0a18c29aae3 100644 --- a/rust/lance/src/dataset/mem_wal/scanner.rs +++ b/rust/lance/src/dataset/mem_wal/scanner.rs @@ -47,7 +47,7 @@ pub use builder::LsmScanner; pub use collector::{ ActiveMemTableRef, InMemoryMemTableRef, InMemoryMemTables, LsmDataSourceCollector, }; -pub use data_source::{FlushedGeneration, LsmDataSource, LsmGeneration, ShardSnapshot}; +pub use data_source::{AsOfCut, FlushedGeneration, LsmDataSource, LsmGeneration, ShardSnapshot}; pub use flushed_cache::FlushedMemTableCache; pub use fts_search::{LsmFtsSearchPlanner, SCORE_COLUMN}; pub use point_lookup::LsmPointLookupPlanner; diff --git a/rust/lance/src/dataset/mem_wal/scanner/block_list.rs b/rust/lance/src/dataset/mem_wal/scanner/block_list.rs index 684fde48da1..0a81a5b5282 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/block_list.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/block_list.rs @@ -21,7 +21,7 @@ use lance_core::Result; use uuid::Uuid; -use super::data_source::{LsmDataSource, LsmGeneration}; +use super::data_source::{AsOfCut, LsmDataSource, LsmGeneration}; use super::exec::{compute_pk_hash, resolve_pk_indices}; use super::flushed_cache::{FlushedMemTableCache, open_flushed_dataset}; use crate::dataset::Dataset; @@ -109,20 +109,71 @@ pub async fn compute_source_block_lists( /// (from the cache). Same `Vec>>` shape the vector-search filter /// consumes; a base/external reader can drop any row whose PK is in one of them. /// The base source, if present, is skipped (it is what gets shadowed). +/// +/// When `as_of` carries a cut for a source's shard, membership is bounded to +/// that snapshot (see [`AsOfCut`]): in-memory sources at a generation above +/// `active_generation` are excluded (they appeared after the snapshot); the +/// active generation is bounded to its first `active_batch_count` batches; +/// everything below the active generation (frozen and flushed) is immutable and +/// included whole. A shard absent from `as_of` (or `as_of == None`) uses the +/// live tier. The bound only excludes rows, so it can under-count (a tolerable +/// stale read) but never over-count. pub async fn fresh_tier_block_list( sources: &[LsmDataSource], pk_columns: &[String], session: Option<&Arc>, flushed_cache: Option<&Arc>, + as_of: Option<&HashMap>, ) -> Result>>> { let mut sets = Vec::new(); for source in sources { let set = match source { LsmDataSource::BaseTable { .. } => continue, - LsmDataSource::ActiveMemTable { batch_store, .. } => { - Arc::new(pk_hashes_from_batch_store(batch_store, pk_columns)?) - } - LsmDataSource::FlushedMemTable { path, .. } => { + LsmDataSource::ActiveMemTable { + batch_store, + shard_id, + generation, + .. + } => match as_of.and_then(|m| m.get(shard_id)) { + Some(cut) => { + let g = generation.as_u64(); + if g > cut.active_generation { + // A newer in-memory generation that rolled in after the + // snapshot — not observed by the scan arm. + continue; + } + // The active generation grew between the arm and now; bound + // it to the batches the arm saw. Lower (frozen) generations + // are immutable, so all their batches were observed. + let limit = if g == cut.active_generation { + cut.active_batch_count as usize + } else { + usize::MAX + }; + Arc::new(pk_hashes_from_batch_store_bounded( + batch_store, + pk_columns, + limit, + )?) + } + None => Arc::new(pk_hashes_from_batch_store(batch_store, pk_columns)?), + }, + LsmDataSource::FlushedMemTable { + path, + shard_id, + generation, + .. + } => { + // A flushed generation at or above the active generation was + // produced by a flush after the snapshot (and may hold rows the + // arm never saw), so it must not contribute. Lower generations + // are immutable and were fully observed. + let flushed_after_snapshot = as_of + .and_then(|m| m.get(shard_id)) + .is_some_and(|cut| generation.as_u64() >= cut.active_generation); + if flushed_after_snapshot { + continue; + } flushed_pk_hashes(path, pk_columns, session, flushed_cache).await? } }; @@ -148,6 +199,26 @@ pub fn pk_hashes_from_batch_store( pk_hashes_from_batches(&batches, pk_columns) } +/// As-of variant of [`pk_hashes_from_batch_store`]: includes only the first +/// `limit` batches (by append index) — the batches a prior scan observed before +/// the memtable grew. `usize::MAX` includes all (an immutable lower generation). +/// Bounding the membership to what the arm saw keeps the block-list from +/// dropping a base row whose replacement the arm did not deliver. +pub fn pk_hashes_from_batch_store_bounded( + store: &BatchStore, + pk_columns: &[String], + limit: usize, +) -> Result> { + let end = store.len().min(limit); + let mut batches: Vec = Vec::with_capacity(end); + for i in 0..end { + if let Some(stored) = store.get(i) { + batches.push(stored.data.clone()); + } + } + pk_hashes_from_batches(&batches, pk_columns) +} + /// Hash every row's primary key across `batches` into a membership set. fn pk_hashes_from_batches(batches: &[RecordBatch], pk_columns: &[String]) -> Result> { let mut pk_hashes = HashSet::new(); @@ -296,7 +367,7 @@ mod tests { // Active gen 2: pk=1,2. Frozen gen 1: pk=3. let sources = vec![mk(&[1, 2], 2), mk(&[3], 1)]; - let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None) + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, None) .await .unwrap(); @@ -457,4 +528,165 @@ mod tests { assert!(!blocked.contains_key(&(Some(a), g2))); assert!(!blocked.contains_key(&(Some(b), g2))); } + + /// An as-of cut bounds the active generation to the first + /// `active_batch_count` batches — the ones the scan arm observed before the + /// memtable grew. A later append (higher batch index) is invisible to the + /// check, so a base row is never dropped without the arm having delivered + /// its replacement. + #[tokio::test] + async fn as_of_cut_bounds_active_memtable_by_batch_count() { + use crate::dataset::mem_wal::scanner::data_source::{ + AsOfCut, LsmDataSource, LsmGeneration, + }; + use crate::dataset::mem_wal::write::IndexStore; + use std::collections::HashMap; + + let shard = Uuid::new_v4(); + // Three single-row batches: indices 0, 1, 2. + let store = BatchStore::with_capacity(8); + store.append(id_batch(&[1])).unwrap(); // batch 0 + store.append(id_batch(&[2])).unwrap(); // batch 1 + store.append(id_batch(&[3])).unwrap(); // batch 2 (appended after the arm) + let sources = vec![LsmDataSource::ActiveMemTable { + batch_store: Arc::new(store), + index_store: Arc::new(IndexStore::new()), + schema: id_batch(&[1]).schema(), + shard_id: shard, + generation: LsmGeneration::memtable(1), + }]; + + // Cut at 2 batches of gen 1: pk=1,2 are members; pk=3 (batch 2) is not. + let as_of: HashMap = [( + shard, + AsOfCut { + active_generation: 1, + active_batch_count: 2, + }, + )] + .into_iter() + .collect(); + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&as_of)) + .await + .unwrap(); + assert!(blocks(&sets, 1)); + assert!(blocks(&sets, 2)); + assert!(!blocks(&sets, 3)); + + // No cut → live tier: all three are members. + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, None) + .await + .unwrap(); + for id in [1, 2, 3] { + assert!(blocks(&sets, id)); + } + } + + /// A generation above the active generation rolled in after the snapshot and + /// is excluded whole; a generation below it is immutable (frozen) and + /// included whole regardless of the active batch count. + #[tokio::test] + async fn as_of_cut_excludes_newer_gen_includes_lower_gen() { + use crate::dataset::mem_wal::scanner::data_source::{ + AsOfCut, LsmDataSource, LsmGeneration, + }; + use crate::dataset::mem_wal::write::IndexStore; + use std::collections::HashMap; + + let shard = Uuid::new_v4(); + let mk = |ids: &[i32], generation: u64| { + let store = BatchStore::with_capacity(8); + for id in ids { + store.append(id_batch(&[*id])).unwrap(); + } + LsmDataSource::ActiveMemTable { + batch_store: Arc::new(store), + index_store: Arc::new(IndexStore::new()), + schema: id_batch(&[1]).schema(), + shard_id: shard, + generation: LsmGeneration::memtable(generation), + } + }; + // gen 3 newer (after snapshot), gen 2 == active (bounded to 1 batch), + // gen 1 lower/immutable (whole). + let sources = vec![mk(&[100], 3), mk(&[20, 21], 2), mk(&[1, 2], 1)]; + + let as_of: HashMap = [( + shard, + AsOfCut { + active_generation: 2, + active_batch_count: 1, + }, + )] + .into_iter() + .collect(); + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&as_of)) + .await + .unwrap(); + assert!(blocks(&sets, 1)); // gen 1, whole + assert!(blocks(&sets, 2)); // gen 1, whole + assert!(blocks(&sets, 20)); // gen 2, batch 0 + assert!(!blocks(&sets, 21)); // gen 2, batch 1 — past the cut + assert!(!blocks(&sets, 100)); // gen 3 — after the snapshot + } + + /// A flushed generation at or above the active generation was produced by a + /// flush after the snapshot and is excluded; one strictly below it is + /// immutable and included. + #[tokio::test] + async fn as_of_cut_excludes_flushed_at_or_above_active() { + use crate::dataset::mem_wal::scanner::data_source::{ + AsOfCut, LsmDataSource, LsmGeneration, + }; + use crate::dataset::{Dataset, WriteParams}; + use arrow_array::RecordBatchIterator; + use std::collections::HashMap; + + // A flushed generation 2 holding pk=5, written as a plain dataset. + let flushed_batch = id_batch(&[5]); + let schema = flushed_batch.schema(); + let tmp = tempfile::tempdir().unwrap(); + let path = format!("{}/gen2", tmp.path().to_str().unwrap()); + let reader = RecordBatchIterator::new(vec![Ok(flushed_batch)], schema.clone()); + Dataset::write(reader, &path, Some(WriteParams::default())) + .await + .unwrap(); + + let shard = Uuid::new_v4(); + let sources = vec![LsmDataSource::FlushedMemTable { + path, + shard_id: shard, + generation: LsmGeneration::memtable(2), + }]; + + // active_generation 2 (gen 2 flushed at/after the snapshot): excluded. + let at: HashMap = [( + shard, + AsOfCut { + active_generation: 2, + active_batch_count: u64::MAX, + }, + )] + .into_iter() + .collect(); + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&at)) + .await + .unwrap(); + assert!(!blocks(&sets, 5)); + + // active_generation 3 (gen 2 strictly below, immutable): included. + let above: HashMap = [( + shard, + AsOfCut { + active_generation: 3, + active_batch_count: u64::MAX, + }, + )] + .into_iter() + .collect(); + let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&above)) + .await + .unwrap(); + assert!(blocks(&sets, 5)); + } } diff --git a/rust/lance/src/dataset/mem_wal/scanner/builder.rs b/rust/lance/src/dataset/mem_wal/scanner/builder.rs index ade4164d485..9716a02bcce 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/builder.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/builder.rs @@ -20,7 +20,7 @@ use lance_core::{Error, Result, is_system_column}; use uuid::Uuid; use super::collector::{InMemoryMemTableRef, InMemoryMemTables, LsmDataSourceCollector}; -use super::data_source::ShardSnapshot; +use super::data_source::{AsOfCut, ShardSnapshot}; use super::flushed_cache::FlushedMemTableCache; use super::planner::LsmScanPlanner; use super::point_lookup::LsmPointLookupPlanner; @@ -456,12 +456,26 @@ impl LsmScanner { /// hashes PKs itself. Flushed membership comes from the injected /// [`FlushedMemTableCache`] when one is set. pub async fn contains_pks(&self, pks: &RecordBatch) -> Result> { + self.contains_pks_as_of(pks, None).await + } + + /// As-of variant of [`Self::contains_pks`]. Membership is evaluated against + /// a snapshot-consistent cut of the fresh tier, supplied per shard via + /// `as_of` (see [`AsOfCut`]), so a caller can match the exact tier a prior + /// scan observed and avoid the two-snapshot skew that would drop a base row + /// with no delivered replacement. `None` evaluates against the live tier. + pub async fn contains_pks_as_of( + &self, + pks: &RecordBatch, + as_of: Option<&HashMap>, + ) -> Result> { let sources = self.build_collector().collect()?; let sets = super::block_list::fresh_tier_block_list( &sources, &self.pk_columns, self.session.as_ref(), self.flushed_cache.as_ref(), + as_of, ) .await?; let pk_indices = super::exec::resolve_pk_indices(pks, &self.pk_columns) diff --git a/rust/lance/src/dataset/mem_wal/scanner/data_source.rs b/rust/lance/src/dataset/mem_wal/scanner/data_source.rs index 1a6207f27e3..3288b026b8d 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/data_source.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/data_source.rs @@ -11,6 +11,34 @@ use uuid::Uuid; use crate::dataset::Dataset; use crate::dataset::mem_wal::write::{BatchStore, IndexStore}; +/// A snapshot-consistent cut of one shard's fresh tier, used to evaluate +/// membership "as of" the exact tier a prior scan observed (see +/// [`super::builder::LsmScanner::contains_pks_as_of`]). +/// +/// The only fresh-tier source that grows between two reads is the active +/// memtable (appended batches) and, when it rolls, a new generation. Everything +/// at a lower generation — frozen memtables and flushed generations — is +/// immutable and was fully observed. So the cut is `(active_generation, +/// active_batch_count)`: include all sources strictly below `active_generation` +/// whole, include the active generation only up to `active_batch_count` batches, +/// and exclude anything at a higher generation (it appeared after the snapshot). +/// This uses only the batch count and generation — both always available — +/// rather than per-batch WAL positions, which the memtable does not track on the +/// write path. The bound only ever *excludes* rows the scan did not observe, so +/// a stale cut under-counts (a tolerable stale read) rather than over-counts +/// (which would drop a row with no replacement). +#[derive(Debug, Clone, Copy)] +pub struct AsOfCut { + /// Generation of the active memtable the scan observed. In-memory sources at + /// a higher generation appeared after the snapshot and are excluded; sources + /// at a lower generation are immutable and included whole. + pub active_generation: u64, + /// Number of batches in the active memtable at snapshot time. Within the + /// active generation, only batches with index `< active_batch_count` were + /// observed; later appends are excluded. + pub active_batch_count: u64, +} + /// Generation number in LSM tree. /// /// The base table has generation 0. MemTables have positive integers