Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/mem_wal/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub use builder::LsmScanner;
pub use collector::{
ActiveMemTableRef, InMemoryMemTableRef, InMemoryMemTables, LsmDataSourceCollector,
};
pub use data_source::{FlushedGeneration, LsmDataSource, LsmGeneration, ShardSnapshot};
pub use data_source::{AsOfCut, FlushedGeneration, LsmDataSource, LsmGeneration, ShardSnapshot};
pub use flushed_cache::FlushedMemTableCache;
pub use fts_search::{LsmFtsSearchPlanner, SCORE_COLUMN};
pub use point_lookup::LsmPointLookupPlanner;
Expand Down
244 changes: 238 additions & 6 deletions rust/lance/src/dataset/mem_wal/scanner/block_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use lance_core::Result;

use uuid::Uuid;

use super::data_source::{LsmDataSource, LsmGeneration};
use super::data_source::{AsOfCut, LsmDataSource, LsmGeneration};
use super::exec::{compute_pk_hash, resolve_pk_indices};
use super::flushed_cache::{FlushedMemTableCache, open_flushed_dataset};
use crate::dataset::Dataset;
Expand Down Expand Up @@ -109,20 +109,71 @@ pub async fn compute_source_block_lists(
/// (from the cache). Same `Vec<Arc<HashSet<u64>>>` shape the vector-search filter
/// consumes; a base/external reader can drop any row whose PK is in one of them.
/// The base source, if present, is skipped (it is what gets shadowed).
///
/// When `as_of` carries a cut for a source's shard, membership is bounded to
/// that snapshot (see [`AsOfCut`]): in-memory sources at a generation above
/// `active_generation` are excluded (they appeared after the snapshot); the
/// active generation is bounded to its first `active_batch_count` batches;
/// everything below the active generation (frozen and flushed) is immutable and
/// included whole. A shard absent from `as_of` (or `as_of == None`) uses the
/// live tier. The bound only excludes rows, so it can under-count (a tolerable
/// stale read) but never over-count.
pub async fn fresh_tier_block_list(
sources: &[LsmDataSource],
pk_columns: &[String],
session: Option<&Arc<Session>>,
flushed_cache: Option<&Arc<FlushedMemTableCache>>,
as_of: Option<&HashMap<Uuid, AsOfCut>>,
) -> Result<Vec<Arc<HashSet<u64>>>> {
let mut sets = Vec::new();
for source in sources {
let set = match source {
LsmDataSource::BaseTable { .. } => continue,
LsmDataSource::ActiveMemTable { batch_store, .. } => {
Arc::new(pk_hashes_from_batch_store(batch_store, pk_columns)?)
}
LsmDataSource::FlushedMemTable { path, .. } => {
LsmDataSource::ActiveMemTable {
batch_store,
shard_id,
generation,
..
} => match as_of.and_then(|m| m.get(shard_id)) {
Some(cut) => {
let g = generation.as_u64();
if g > cut.active_generation {
// A newer in-memory generation that rolled in after the
// snapshot — not observed by the scan arm.
continue;
}
// The active generation grew between the arm and now; bound
// it to the batches the arm saw. Lower (frozen) generations
// are immutable, so all their batches were observed.
let limit = if g == cut.active_generation {
cut.active_batch_count as usize
} else {
usize::MAX
};
Arc::new(pk_hashes_from_batch_store_bounded(
batch_store,
pk_columns,
limit,
)?)
}
None => Arc::new(pk_hashes_from_batch_store(batch_store, pk_columns)?),
},
LsmDataSource::FlushedMemTable {
path,
shard_id,
generation,
..
} => {
// A flushed generation at or above the active generation was
// produced by a flush after the snapshot (and may hold rows the
// arm never saw), so it must not contribute. Lower generations
// are immutable and were fully observed.
let flushed_after_snapshot = as_of
.and_then(|m| m.get(shard_id))
.is_some_and(|cut| generation.as_u64() >= cut.active_generation);
if flushed_after_snapshot {
continue;
}
flushed_pk_hashes(path, pk_columns, session, flushed_cache).await?
}
};
Expand All @@ -148,6 +199,26 @@ pub fn pk_hashes_from_batch_store(
pk_hashes_from_batches(&batches, pk_columns)
}

/// As-of variant of [`pk_hashes_from_batch_store`]: includes only the first
/// `limit` batches (by append index) — the batches a prior scan observed before
/// the memtable grew. `usize::MAX` includes all (an immutable lower generation).
/// Bounding the membership to what the arm saw keeps the block-list from
/// dropping a base row whose replacement the arm did not deliver.
pub fn pk_hashes_from_batch_store_bounded(
store: &BatchStore,
pk_columns: &[String],
limit: usize,
) -> Result<HashSet<u64>> {
let end = store.len().min(limit);
let mut batches: Vec<RecordBatch> = Vec::with_capacity(end);
for i in 0..end {
if let Some(stored) = store.get(i) {
batches.push(stored.data.clone());
}
}
pk_hashes_from_batches(&batches, pk_columns)
}

/// Hash every row's primary key across `batches` into a membership set.
fn pk_hashes_from_batches(batches: &[RecordBatch], pk_columns: &[String]) -> Result<HashSet<u64>> {
let mut pk_hashes = HashSet::new();
Expand Down Expand Up @@ -296,7 +367,7 @@ mod tests {
// Active gen 2: pk=1,2. Frozen gen 1: pk=3.
let sources = vec![mk(&[1, 2], 2), mk(&[3], 1)];

let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None)
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, None)
.await
.unwrap();

Expand Down Expand Up @@ -457,4 +528,165 @@ mod tests {
assert!(!blocked.contains_key(&(Some(a), g2)));
assert!(!blocked.contains_key(&(Some(b), g2)));
}

/// An as-of cut bounds the active generation to the first
/// `active_batch_count` batches — the ones the scan arm observed before the
/// memtable grew. A later append (higher batch index) is invisible to the
/// check, so a base row is never dropped without the arm having delivered
/// its replacement.
#[tokio::test]
async fn as_of_cut_bounds_active_memtable_by_batch_count() {
use crate::dataset::mem_wal::scanner::data_source::{
AsOfCut, LsmDataSource, LsmGeneration,
};
use crate::dataset::mem_wal::write::IndexStore;
use std::collections::HashMap;

let shard = Uuid::new_v4();
// Three single-row batches: indices 0, 1, 2.
let store = BatchStore::with_capacity(8);
store.append(id_batch(&[1])).unwrap(); // batch 0
store.append(id_batch(&[2])).unwrap(); // batch 1
store.append(id_batch(&[3])).unwrap(); // batch 2 (appended after the arm)
let sources = vec![LsmDataSource::ActiveMemTable {
batch_store: Arc::new(store),
index_store: Arc::new(IndexStore::new()),
schema: id_batch(&[1]).schema(),
shard_id: shard,
generation: LsmGeneration::memtable(1),
}];

// Cut at 2 batches of gen 1: pk=1,2 are members; pk=3 (batch 2) is not.
let as_of: HashMap<Uuid, AsOfCut> = [(
shard,
AsOfCut {
active_generation: 1,
active_batch_count: 2,
},
)]
.into_iter()
.collect();
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&as_of))
.await
.unwrap();
assert!(blocks(&sets, 1));
assert!(blocks(&sets, 2));
assert!(!blocks(&sets, 3));

// No cut → live tier: all three are members.
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, None)
.await
.unwrap();
for id in [1, 2, 3] {
assert!(blocks(&sets, id));
}
}

/// A generation above the active generation rolled in after the snapshot and
/// is excluded whole; a generation below it is immutable (frozen) and
/// included whole regardless of the active batch count.
#[tokio::test]
async fn as_of_cut_excludes_newer_gen_includes_lower_gen() {
use crate::dataset::mem_wal::scanner::data_source::{
AsOfCut, LsmDataSource, LsmGeneration,
};
use crate::dataset::mem_wal::write::IndexStore;
use std::collections::HashMap;

let shard = Uuid::new_v4();
let mk = |ids: &[i32], generation: u64| {
let store = BatchStore::with_capacity(8);
for id in ids {
store.append(id_batch(&[*id])).unwrap();
}
LsmDataSource::ActiveMemTable {
batch_store: Arc::new(store),
index_store: Arc::new(IndexStore::new()),
schema: id_batch(&[1]).schema(),
shard_id: shard,
generation: LsmGeneration::memtable(generation),
}
};
// gen 3 newer (after snapshot), gen 2 == active (bounded to 1 batch),
// gen 1 lower/immutable (whole).
let sources = vec![mk(&[100], 3), mk(&[20, 21], 2), mk(&[1, 2], 1)];

let as_of: HashMap<Uuid, AsOfCut> = [(
shard,
AsOfCut {
active_generation: 2,
active_batch_count: 1,
},
)]
.into_iter()
.collect();
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&as_of))
.await
.unwrap();
assert!(blocks(&sets, 1)); // gen 1, whole
assert!(blocks(&sets, 2)); // gen 1, whole
assert!(blocks(&sets, 20)); // gen 2, batch 0
assert!(!blocks(&sets, 21)); // gen 2, batch 1 — past the cut
assert!(!blocks(&sets, 100)); // gen 3 — after the snapshot
}

/// A flushed generation at or above the active generation was produced by a
/// flush after the snapshot and is excluded; one strictly below it is
/// immutable and included.
#[tokio::test]
async fn as_of_cut_excludes_flushed_at_or_above_active() {
use crate::dataset::mem_wal::scanner::data_source::{
AsOfCut, LsmDataSource, LsmGeneration,
};
use crate::dataset::{Dataset, WriteParams};
use arrow_array::RecordBatchIterator;
use std::collections::HashMap;

// A flushed generation 2 holding pk=5, written as a plain dataset.
let flushed_batch = id_batch(&[5]);
let schema = flushed_batch.schema();
let tmp = tempfile::tempdir().unwrap();
let path = format!("{}/gen2", tmp.path().to_str().unwrap());
let reader = RecordBatchIterator::new(vec![Ok(flushed_batch)], schema.clone());
Dataset::write(reader, &path, Some(WriteParams::default()))
.await
.unwrap();

let shard = Uuid::new_v4();
let sources = vec![LsmDataSource::FlushedMemTable {
path,
shard_id: shard,
generation: LsmGeneration::memtable(2),
}];

// active_generation 2 (gen 2 flushed at/after the snapshot): excluded.
let at: HashMap<Uuid, AsOfCut> = [(
shard,
AsOfCut {
active_generation: 2,
active_batch_count: u64::MAX,
},
)]
.into_iter()
.collect();
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&at))
.await
.unwrap();
assert!(!blocks(&sets, 5));

// active_generation 3 (gen 2 strictly below, immutable): included.
let above: HashMap<Uuid, AsOfCut> = [(
shard,
AsOfCut {
active_generation: 3,
active_batch_count: u64::MAX,
},
)]
.into_iter()
.collect();
let sets = fresh_tier_block_list(&sources, &["id".to_string()], None, None, Some(&above))
.await
.unwrap();
assert!(blocks(&sets, 5));
}
}
16 changes: 15 additions & 1 deletion rust/lance/src/dataset/mem_wal/scanner/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use lance_core::{Error, Result, is_system_column};
use uuid::Uuid;

use super::collector::{InMemoryMemTableRef, InMemoryMemTables, LsmDataSourceCollector};
use super::data_source::ShardSnapshot;
use super::data_source::{AsOfCut, ShardSnapshot};
use super::flushed_cache::FlushedMemTableCache;
use super::planner::LsmScanPlanner;
use super::point_lookup::LsmPointLookupPlanner;
Expand Down Expand Up @@ -456,12 +456,26 @@ impl LsmScanner {
/// hashes PKs itself. Flushed membership comes from the injected
/// [`FlushedMemTableCache`] when one is set.
pub async fn contains_pks(&self, pks: &RecordBatch) -> Result<Vec<bool>> {
self.contains_pks_as_of(pks, None).await
}

/// As-of variant of [`Self::contains_pks`]. Membership is evaluated against
/// a snapshot-consistent cut of the fresh tier, supplied per shard via
/// `as_of` (see [`AsOfCut`]), so a caller can match the exact tier a prior
/// scan observed and avoid the two-snapshot skew that would drop a base row
/// with no delivered replacement. `None` evaluates against the live tier.
pub async fn contains_pks_as_of(
&self,
pks: &RecordBatch,
as_of: Option<&HashMap<Uuid, AsOfCut>>,
) -> Result<Vec<bool>> {
let sources = self.build_collector().collect()?;
let sets = super::block_list::fresh_tier_block_list(
&sources,
&self.pk_columns,
self.session.as_ref(),
self.flushed_cache.as_ref(),
as_of,
)
.await?;
let pk_indices = super::exec::resolve_pk_indices(pks, &self.pk_columns)
Expand Down
28 changes: 28 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,34 @@ use uuid::Uuid;
use crate::dataset::Dataset;
use crate::dataset::mem_wal::write::{BatchStore, IndexStore};

/// A snapshot-consistent cut of one shard's fresh tier, used to evaluate
/// membership "as of" the exact tier a prior scan observed (see
/// [`super::builder::LsmScanner::contains_pks_as_of`]).
///
/// The only fresh-tier source that grows between two reads is the active
/// memtable (appended batches) and, when it rolls, a new generation. Everything
/// at a lower generation — frozen memtables and flushed generations — is
/// immutable and was fully observed. So the cut is `(active_generation,
/// active_batch_count)`: include all sources strictly below `active_generation`
/// whole, include the active generation only up to `active_batch_count` batches,
/// and exclude anything at a higher generation (it appeared after the snapshot).
/// This uses only the batch count and generation — both always available —
/// rather than per-batch WAL positions, which the memtable does not track on the
/// write path. The bound only ever *excludes* rows the scan did not observe, so
/// a stale cut under-counts (a tolerable stale read) rather than over-counts
/// (which would drop a row with no replacement).
#[derive(Debug, Clone, Copy)]
pub struct AsOfCut {
/// Generation of the active memtable the scan observed. In-memory sources at
/// a higher generation appeared after the snapshot and are excluded; sources
/// at a lower generation are immutable and included whole.
pub active_generation: u64,
/// Number of batches in the active memtable at snapshot time. Within the
/// active generation, only batches with index `< active_batch_count` were
/// observed; later appends are excluded.
pub active_batch_count: u64,
}

/// Generation number in LSM tree.
///
/// The base table has generation 0. MemTables have positive integers
Expand Down
Loading