diff --git a/rust/lance-core/src/utils.rs b/rust/lance-core/src/utils.rs index c202329838c..51d72c3dae3 100644 --- a/rust/lance-core/src/utils.rs +++ b/rust/lance-core/src/utils.rs @@ -15,6 +15,7 @@ pub mod hash; pub mod io_stats; pub mod parse; pub mod path; +pub mod row_addr_remap; pub mod tempfile; pub mod testing; pub mod tokio; diff --git a/rust/lance-core/src/utils/row_addr_remap.rs b/rust/lance-core/src/utils/row_addr_remap.rs new file mode 100644 index 00000000000..c52ded07fe4 --- /dev/null +++ b/rust/lance-core/src/utils/row_addr_remap.rs @@ -0,0 +1,376 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Compact row-address remapping for compaction. +//! +//! Compaction rewrites rows into new fragments, so indices that store physical +//! row addresses need an old-address to new-address mapping without building an +//! O(total rows) `HashMap>`. +//! +//! Layout: +//! +//! * Old rows: `old_fragment_id -> (old_offsets, old_rows_before)` +//! * `old_offsets`: rewritten old row offsets in this old fragment. +//! * `old_rows_before`: rewritten row count before this old fragment. +//! * New rows: ordered new-fragment ranges +//! `(fragment_id, new_rows_before, physical_rows)` +//! * `new_rows_before`: rewritten row count before this new fragment. +//! +//! Lookup: +//! +//! * An address whose fragment was not rewritten returns `None`. +//! * For an address whose fragment was rewritten: +//! * Read `(old_offsets, old_rows_before)` from the old-row layout. +//! * If `offset` is not in `old_offsets`, return `Some(None)` because the +//! row was deleted. +//! * Otherwise, `old_offsets.rank(offset) - 1` is this row's 0-based +//! position among rewritten old rows in this old fragment. Add +//! `old_rows_before` to get `k`, the row's 0-based position among all +//! rewritten old rows. +//! * In the new-row layout, find the range +//! `(fragment_id, new_rows_before, physical_rows)` where +//! `new_rows_before <= k < new_rows_before + physical_rows`. +//! * The new address is `(fragment_id, k - new_rows_before)`. +//! +//! Ordering: +//! +//! Compact remap does not store each old-to-new row mapping. It computes `k` +//! from the old-row layout, then maps it to the k-th row written to the new +//! fragments. This requires the reader-to-writer pipeline to preserve row order. +//! +//! * `old_frag_ids` must match the order old fragments are read. Within each +//! old fragment, rewritten rows are interpreted by ascending old row offset. +//! * `new_frags` must match the order new rows are written. +//! * Current compaction satisfies this because it scans selected fragments in +//! order and writes the resulting stream without reordering rows. + +use crate::utils::address::RowAddress; +use crate::{Error, Result}; +use roaring::{RoaringBitmap, RoaringTreemap}; +use std::collections::HashMap; + +/// A queryable row-address remapping with the exact semantics of +/// `HashMap>::get(&addr).copied()`: +/// +/// * `None` — the address is not affected by this remap (keep it unchanged) +/// * `Some(None)` — the row was deleted +/// * `Some(Some(addr))` — the row moved to `addr` +#[derive(Clone)] +pub enum RowAddrRemap { + Compact(CompactRowAddrRemap), + Explicit(HashMap>), +} + +impl RowAddrRemap { + pub fn compact(groups: impl IntoIterator) -> Result { + Ok(Self::Compact(CompactRowAddrRemap::new(groups)?)) + } + + /// An empty remap that leaves every address unchanged. + pub fn empty() -> Self { + Self::Explicit(HashMap::new()) + } + + /// Look up `addr`. See [`RowAddrRemap`] for the tri-state return semantics. + #[inline] + pub fn get(&self, addr: u64) -> Option> { + match self { + Self::Compact(c) => c.get(addr), + Self::Explicit(m) => m.get(&addr).copied(), + } + } + + pub fn is_empty(&self) -> bool { + match self { + Self::Compact(c) => c.is_empty(), + Self::Explicit(m) => m.is_empty(), + } + } + + pub fn affected_fragments(&self) -> RoaringBitmap { + match self { + Self::Compact(c) => RoaringBitmap::from_iter(c.frag_to_group.keys().copied()), + Self::Explicit(m) => RoaringBitmap::from_iter(m.keys().map(|addr| (addr >> 32) as u32)), + } + } + + pub fn fully_deleted_fragments(&self) -> Option { + match self { + Self::Compact(c) => c.fully_deleted_fragments(), + Self::Explicit(m) => { + if m.values().all(|v| v.is_none()) { + Some(RoaringBitmap::from_iter( + m.keys().map(|addr| (addr >> 32) as u32), + )) + } else { + None + } + } + } + } +} + +impl From>> for RowAddrRemap { + fn from(map: HashMap>) -> Self { + Self::Explicit(map) + } +} + +/// Input describing one rewrite group: the old row addresses that were +/// rewritten plus the fragment layout before/after the rewrite. +pub struct GroupInput { + /// Old row addresses that were read and re-written into the new fragments. + pub rewritten_old_row_addrs: RoaringTreemap, + /// Old fragment ids covered by this group. + pub old_frag_ids: Vec, + /// New fragments produced by this group, as `(fragment_id, physical_rows)`, + pub new_frags: Vec<(u32, u32)>, +} + +#[derive(Clone)] +struct GroupRemap { + /// Old fragment id -> (rewritten old row offsets in that fragment, + /// rewritten row count before this fragment in the group). + frags: HashMap, + /// New fragment ranges as `(fragment_id, rewritten_rows_before, physical_rows)`, + /// used to map a rewritten row's group-local index to its new address via binary search. + new_frag_row_ranges: Vec<(u32, u64, u32)>, +} + +impl GroupRemap { + fn new(input: GroupInput) -> Result { + let new_frag_row_ranges = { + let mut rewritten_rows_before = 0u64; + input + .new_frags + .into_iter() + .filter(|(_, physical_rows)| *physical_rows > 0) + .map(|(frag_id, physical_rows)| { + let range = (frag_id, rewritten_rows_before, physical_rows); + rewritten_rows_before += physical_rows as u64; + range + }) + .collect() + }; + + let mut per_frag: HashMap = input + .rewritten_old_row_addrs + .bitmaps() + .map(|(frag_id, bitmap)| (frag_id, bitmap.clone())) + .collect(); + let mut frags = HashMap::new(); + let mut rewritten_rows_before = 0u64; + for &frag_id in &input.old_frag_ids { + // A fragment with no rewritten rows (fully deleted) contributes + // nothing to the rewritten row sequence. + if let Some(bitmap) = per_frag.remove(&frag_id) { + let num_rewritten_rows = bitmap.len(); + frags.insert(frag_id, (bitmap, rewritten_rows_before)); + rewritten_rows_before += num_rewritten_rows; + } + } + // Rewritten old row addresses must reference only fragments listed in `old_frag_ids`. + if !per_frag.is_empty() { + return Err(Error::invalid_input(format!( + "compaction rewritten old row addresses reference fragments {:?} not in the rewrite group's old fragments {:?}", + per_frag.keys().collect::>(), + input.old_frag_ids, + ))); + } + + Ok(Self { + frags, + new_frag_row_ranges, + }) + } + + fn compute_new_addr(&self, rewritten_row_index: u64) -> u64 { + let idx = + match self + .new_frag_row_ranges + .binary_search_by(|(_, rewritten_rows_before, _)| { + rewritten_rows_before.cmp(&rewritten_row_index) + }) { + Ok(i) => i, + Err(i) => i - 1, + }; + let (frag_id, rewritten_rows_before, _rows) = self.new_frag_row_ranges[idx]; + let offset = (rewritten_row_index - rewritten_rows_before) as u32; + u64::from(RowAddress::new_from_parts(frag_id, offset)) + } + + /// Compute the new address for an old row in this group. + /// Returns `None` if the old row was not rewritten. + #[inline] + fn get(&self, frag: u32, offset: u32) -> Option { + match self.frags.get(&frag) { + Some((bitmap, rewritten_rows_before)) if bitmap.contains(offset) => { + let rewritten_row_index = rewritten_rows_before + bitmap.rank(offset) - 1; + Some(self.compute_new_addr(rewritten_row_index)) + } + _ => None, + } + } +} + +/// Compact remap backed by per-group rewritten row bitmaps + new-fragment layouts. +#[derive(Clone)] +pub struct CompactRowAddrRemap { + groups: Vec, + /// Old fragment id -> index into `groups`. Size is O(#fragments), not rows. + frag_to_group: HashMap, +} + +impl CompactRowAddrRemap { + fn new(groups: impl IntoIterator) -> Result { + let mut frag_to_group = HashMap::new(); + let mut group_remaps = Vec::new(); + for input in groups { + let gi = group_remaps.len(); + for &frag_id in &input.old_frag_ids { + frag_to_group.insert(frag_id, gi); + } + group_remaps.push(GroupRemap::new(input)?); + } + Ok(Self { + groups: group_remaps, + frag_to_group, + }) + } + + #[inline] + pub fn get(&self, addr: u64) -> Option> { + let frag = (addr >> 32) as u32; + // Not in any rewrite group -> unaffected by this remap. + let gi = *self.frag_to_group.get(&frag)?; + Some(self.groups[gi].get(frag, addr as u32)) + } + + pub fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + fn fully_deleted_fragments(&self) -> Option { + // A group with any rewritten row moved at least one row. + if self.groups.iter().any(|g| !g.frags.is_empty()) { + return None; + } + Some(RoaringBitmap::from_iter(self.frag_to_group.keys().copied())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(frag: u32, offset: u32) -> u64 { + u64::from(RowAddress::new_from_parts(frag, offset)) + } + + #[test] + fn test_compact_lookup() { + // Group A: out-of-order old frags [4, 3], split new frags (11 empty), + // some deletions. frag 4 (5 rows) keeps 0,2,4; frag 3 keeps 0,1, so the + // rewritten rows (4,0)(4,2)(4,4)(3,0)(3,1) go to new frags 10(2), 12(3). + // Group B is a fully-deleted fragment. + let group_a = GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter([ + addr(4, 0), + addr(4, 2), + addr(4, 4), + addr(3, 0), + addr(3, 1), + ]), + old_frag_ids: vec![4, 3], + new_frags: vec![(10, 2), (11, 0), (12, 3)], + }; + let group_b = GroupInput { + rewritten_old_row_addrs: RoaringTreemap::new(), + old_frag_ids: vec![7], + new_frags: vec![], + }; + let remap = RowAddrRemap::compact([group_a, group_b]).unwrap(); + + // Moves, in rewrite order; frag 4 comes first despite the larger id. + assert_eq!(remap.get(addr(4, 0)), Some(Some(addr(10, 0)))); + assert_eq!(remap.get(addr(4, 2)), Some(Some(addr(10, 1)))); + // Rank 2 skips the zero-row new fragment 11 and lands in fragment 12. + assert_eq!(remap.get(addr(4, 4)), Some(Some(addr(12, 0)))); + assert_eq!(remap.get(addr(3, 0)), Some(Some(addr(12, 1)))); + assert_eq!(remap.get(addr(3, 1)), Some(Some(addr(12, 2)))); + // Deleted offsets inside a rewritten fragment. + assert_eq!(remap.get(addr(4, 1)), Some(None)); + assert_eq!(remap.get(addr(4, 3)), Some(None)); + // Covered but fully-deleted fragment -> Some(None), not None. + assert_eq!(remap.get(addr(7, 0)), Some(None)); + // Fragment in no group -> unaffected. + assert_eq!(remap.get(addr(9, 0)), None); + assert!(!remap.is_empty()); + } + + #[test] + fn test_fragment_sets() { + // No rewritten rows at all: every covered fragment is fully deleted. + let dead = RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::new(), + old_frag_ids: vec![3, 7], + new_frags: vec![], + }]) + .unwrap(); + assert_eq!( + dead.fully_deleted_fragments(), + Some(RoaringBitmap::from_iter([3u32, 7u32])) + ); + assert_eq!( + dead.affected_fragments(), + RoaringBitmap::from_iter([3u32, 7u32]) + ); + + // At least one rewritten row -> not fully deleted, but both covered + // fragments (including the fully-deleted frag 1) are still affected. + let alive = RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter([addr(0, 0)]), + old_frag_ids: vec![0, 1], + new_frags: vec![(10, 1)], + }]) + .unwrap(); + assert!(alive.fully_deleted_fragments().is_none()); + assert_eq!( + alive.affected_fragments(), + RoaringBitmap::from_iter([0u32, 1u32]) + ); + } + + #[test] + fn test_compact_rejects_rewritten_addrs_outside_old_frags() { + // Rewritten addresses reference frag 5, not in old_frag_ids. The count + // still matches (2 == 2), so only the per-fragment split catches it. + let input = GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter([addr(0, 0), addr(5, 0)]), + old_frag_ids: vec![0], + new_frags: vec![(10, 2)], + }; + assert!(RowAddrRemap::compact([input]).is_err()); + } + + #[test] + fn test_explicit_and_empty() { + // Explicit covers arbitrary maps the compact form can't express. + let mut map = HashMap::new(); + map.insert(addr(2, 0), Some(addr(9, 9))); + map.insert(addr(5, 1), None); + let remap = RowAddrRemap::Explicit(map); + assert_eq!(remap.get(addr(2, 0)), Some(Some(addr(9, 9)))); + assert_eq!(remap.get(addr(5, 1)), Some(None)); + assert_eq!(remap.get(addr(2, 1)), None); + // affected_fragments over an explicit map: the fragment of every key. + assert_eq!( + remap.affected_fragments(), + RoaringBitmap::from_iter([2u32, 5u32]) + ); + + let empty = RowAddrRemap::empty(); + assert!(empty.is_empty()); + assert_eq!(empty.get(addr(0, 0)), None); + } +} diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index daec92339f8..730dc1e5c18 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -12,6 +12,7 @@ use datafusion::functions::string::contains::ContainsFunc; use datafusion::functions_nested::array_has; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::{Column, scalar::ScalarValue}; +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::pin::Pin; @@ -1018,7 +1019,7 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { /// Remap the row ids, creating a new remapped version of this index in `dest_store` async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result; diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 1ae2faf6e6b..a90f41329b4 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ any::Any, cmp::Reverse, @@ -763,7 +764,7 @@ impl ScalarIndex for BitmapIndex { /// Remap the row ids, creating a new remapped version of this index in `dest_store` async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { let state = self.load_bitmap_index_state().await?; @@ -1479,7 +1480,7 @@ impl BitmapIndexPlugin { /// Remaps every bitmap in a materialized bitmap-index state using row-id mappings. pub(crate) fn remap_bitmap_state( state: HashMap, - mapping: &HashMap>, + mapping: &RowAddrRemap, ) -> HashMap { state .into_iter() @@ -1487,10 +1488,7 @@ impl BitmapIndexPlugin { let remapped_bitmap = RowAddrTreeMap::from_iter(bitmap.row_addrs().unwrap().filter_map(|addr| { let addr_as_u64 = u64::from(addr); - mapping - .get(&addr_as_u64) - .copied() - .unwrap_or(Some(addr_as_u64)) + mapping.get(addr_as_u64).unwrap_or(Some(addr_as_u64)) })); (key, remapped_bitmap) }) @@ -1817,7 +1815,7 @@ mod tests { use lance_core::utils::{address::RowAddress, tempfile::TempObjDir}; use lance_io::object_store::ObjectStore; use lance_select::RowSetOps; - use std::collections::HashMap; + use rstest::rstest; fn assert_state_roundtrips(state: &BitmapIndexState) { let mut buf = Vec::new(); @@ -2317,8 +2315,44 @@ mod tests { assert_eq!(red_rows_2, vec![0, 3, 6, 10, 11]); } + // frags 1 and 2 (3 rows each) are compacted into frag 3: the 6 rows are + // rewritten in order to frag 3 offsets 0..6. + fn bitmap_remap_compact() -> RowAddrRemap { + use lance_core::utils::row_addr_remap::GroupInput; + use roaring::RoaringTreemap; + RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter( + (0..3) + .map(|o| u64::from(RowAddress::new_from_parts(1, o))) + .chain((0..3).map(|o| u64::from(RowAddress::new_from_parts(2, o)))), + ), + old_frag_ids: vec![1, 2], + new_frags: vec![(3, 6)], + }]) + .unwrap() + } + + fn bitmap_remap_explicit() -> RowAddrRemap { + // The same mapping, listed out explicitly. + RowAddrRemap::Explicit( + (0..6u32) + .map(|i| { + let (f, o) = if i < 3 { (1, i) } else { (2, i - 3) }; + ( + u64::from(RowAddress::new_from_parts(f, o)), + Some(u64::from(RowAddress::new_from_parts(3, i))), + ) + }) + .collect(), + ) + } + + // remap must behave identically whether the mapping is compact or explicit. + #[rstest] + #[case(bitmap_remap_compact())] + #[case(bitmap_remap_explicit())] #[tokio::test] - async fn test_remap_bitmap_with_null() { + async fn test_remap_bitmap_with_null(#[case] remap: RowAddrRemap) { use arrow_array::UInt32Array; // Create a temporary store. @@ -2383,38 +2417,8 @@ mod tests { assert_eq!(index.index_map.len(), 2); // 2 non-null values (1 and 2) assert!(!index.null_map.is_empty()); // Should have null values - // Create a remap that simulates compaction of frags 1 and 2 into frag 3 - let mut row_addr_map = HashMap::>::new(); - row_addr_map.insert( - RowAddress::new_from_parts(1, 0).into(), - Some(RowAddress::new_from_parts(3, 0).into()), - ); - row_addr_map.insert( - RowAddress::new_from_parts(1, 1).into(), - Some(RowAddress::new_from_parts(3, 1).into()), - ); - row_addr_map.insert( - RowAddress::new_from_parts(1, 2).into(), - Some(RowAddress::new_from_parts(3, 2).into()), - ); - row_addr_map.insert( - RowAddress::new_from_parts(2, 0).into(), - Some(RowAddress::new_from_parts(3, 3).into()), - ); - row_addr_map.insert( - RowAddress::new_from_parts(2, 1).into(), - Some(RowAddress::new_from_parts(3, 4).into()), - ); - row_addr_map.insert( - RowAddress::new_from_parts(2, 2).into(), - Some(RowAddress::new_from_parts(3, 5).into()), - ); - // Perform remap - index - .remap(&row_addr_map, test_store.as_ref()) - .await - .unwrap(); + index.remap(&remap, test_store.as_ref()).await.unwrap(); // Reload and check let reloaded_idx = BitmapIndex::load(test_store, None, &LanceCache::no_cache()) diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 856f08af772..927bf4593b2 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -20,12 +20,13 @@ use arrow_schema::{DataType, Field}; use lance_arrow_stats::StatisticsAccumulator; use lance_core::utils::bloomfilter::as_bytes; use lance_core::utils::bloomfilter::sbbf::{Sbbf, SbbfBuilder}; +use lance_core::utils::row_addr_remap::RowAddrRemap; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; use datafusion::execution::SendableRecordBatchStream; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use crate::scalar::FragReuseIndex; use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult}; @@ -431,7 +432,7 @@ impl ScalarIndex for BloomFilterIndex { async fn remap( &self, - _mapping: &HashMap>, + _mapping: &RowAddrRemap, _dest_store: &dyn IndexStore, ) -> Result { Err(Error::invalid_input_source( diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6128248308e..2c75c568cf9 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ any::Any, cmp::Ordering, @@ -2135,7 +2136,7 @@ impl ScalarIndex for BTreeIndex { async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { // (part_id, path) @@ -2302,7 +2303,7 @@ pub trait BTreeSubIndex: Debug + Send + Sync + DeepSizeOf { async fn remap_subindex( &self, serialized: RecordBatch, - mapping: &HashMap>, + mapping: &RowAddrRemap, ) -> Result; } @@ -3246,6 +3247,7 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { #[cfg(test)] mod tests { + use lance_core::utils::row_addr_remap::RowAddrRemap; use std::sync::atomic::Ordering; use std::{collections::HashMap, sync::Arc}; @@ -3347,7 +3349,7 @@ mod tests { // Remap with a no-op mapping. The remapped index should be identical to the original index - .remap(&HashMap::default(), remap_store.as_ref()) + .remap(&RowAddrRemap::empty(), remap_store.as_ref()) .await .unwrap(); @@ -4778,7 +4780,7 @@ mod tests { // Remap with a no-op mapping. The remapped index should be identical to the original ranged_index - .remap(&HashMap::default(), remap_store.as_ref()) + .remap(&RowAddrRemap::empty(), remap_store.as_ref()) .await .unwrap(); @@ -5106,7 +5108,10 @@ mod tests { )); // Remap the index with our deletion mapping - index.remap(&mapping, remap_store.as_ref()).await.unwrap(); + index + .remap(&RowAddrRemap::Explicit(mapping), remap_store.as_ref()) + .await + .unwrap(); let remapped_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache()) .await diff --git a/rust/lance-index/src/scalar/btree/flat.rs b/rust/lance-index/src/scalar/btree/flat.rs index 212ef6490be..0ed7c6a5766 100644 --- a/rust/lance-index/src/scalar/btree/flat.rs +++ b/rust/lance-index/src/scalar/btree/flat.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::collections::HashMap; +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ops::Bound, sync::Arc}; use arrow_array::Array; @@ -92,10 +92,7 @@ impl FlatIndex { NullableRowAddrSet::new(self.all_addrs_map.clone(), Default::default()) } - pub fn remap_batch( - batch: RecordBatch, - mapping: &HashMap>, - ) -> Result { + pub fn remap_batch(batch: RecordBatch, mapping: &RowAddrRemap) -> Result { let row_ids = batch.column(IDS_COL_IDX).as_primitive::(); let val_idx_and_new_id = row_ids .values() @@ -103,8 +100,7 @@ impl FlatIndex { .enumerate() .filter_map(|(idx, old_id)| { mapping - .get(old_id) - .copied() + .get(*old_id) .unwrap_or(Some(*old_id)) .map(|new_id| (idx, new_id)) }) @@ -284,7 +280,11 @@ mod tests { use super::*; use arrow_array::{record_batch, types::Int32Type}; use datafusion_common::ScalarValue; + use lance_core::utils::row_addr_remap::GroupInput; use lance_datagen::{RowCount, array, gen_batch}; + use roaring::RoaringTreemap; + use rstest::rstest; + use std::collections::HashMap; fn example_index() -> FlatIndex { let batch = gen_batch() @@ -389,9 +389,11 @@ mod tests { // 3 -> delete // Keep remaining as is let mapping = HashMap::>::from_iter(vec![(0, Some(2000)), (3, None)]); - let remapped = - FlatIndex::try_new(FlatIndex::remap_batch((*index.data).clone(), &mapping).unwrap()) - .unwrap(); + let remapped = FlatIndex::try_new( + FlatIndex::remap_batch((*index.data).clone(), &RowAddrRemap::Explicit(mapping)) + .unwrap(), + ) + .unwrap(); let expected = FlatIndex::try_new( gen_batch() @@ -404,18 +406,22 @@ mod tests { assert_eq!(remapped.data, expected.data); } - // It's possible, during compaction, that an entire page of values is deleted. We just serialize - // it as an empty record batch. - #[tokio::test] - async fn test_remap_to_nothing() { + // An entire page (frag 0) is deleted during compaction. remap_batch must + // drop every row regardless of which RowAddrRemap mode expresses it. + // example_index holds row ids 5, 0, 3, 100, all in frag 0. + #[rstest] + #[case::compact(RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::new(), + old_frag_ids: vec![0], + new_frags: vec![], + }]) + .unwrap())] + #[case::explicit(RowAddrRemap::Explicit( + [5u64, 0, 3, 100].into_iter().map(|id| (id, None)).collect(), + ))] + fn test_remap_to_nothing(#[case] remap: RowAddrRemap) { let index = example_index(); - let mapping = HashMap::>::from_iter(vec![ - (5, None), - (0, None), - (3, None), - (100, None), - ]); - let remapped = FlatIndex::remap_batch((*index.data).clone(), &mapping).unwrap(); + let remapped = FlatIndex::remap_batch((*index.data).clone(), &remap).unwrap(); assert_eq!(remapped.num_rows(), 0); } diff --git a/rust/lance-index/src/scalar/fmindex.rs b/rust/lance-index/src/scalar/fmindex.rs index 9677f7471ea..e2561c9fb07 100644 --- a/rust/lance-index/src/scalar/fmindex.rs +++ b/rust/lance-index/src/scalar/fmindex.rs @@ -31,6 +31,7 @@ use datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; use lance_core::cache::LanceCache; use lance_core::deepsize::DeepSizeOf; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::{Error, ROW_ADDR, Result}; use roaring::RoaringBitmap; @@ -1357,11 +1358,7 @@ impl ScalarIndex for FMIndexScalarIndex { fn can_remap(&self) -> bool { false } - async fn remap( - &self, - _: &HashMap>, - _: &dyn IndexStore, - ) -> Result { + async fn remap(&self, _: &RowAddrRemap, _: &dyn IndexStore) -> Result { Err(Error::not_supported("Fm does not support remap")) } async fn update( diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 24b1eb50203..6eb6f6f470d 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -25,6 +25,7 @@ use lance_arrow::{ARROW_EXT_NAME_KEY, iter_str_array}; use lance_core::cache::LanceCache; use lance_core::deepsize::DeepSizeOf; use lance_core::error::LanceOptionExt; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cpus, spawn_cpu}; use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result}; use lance_io::object_store::ObjectStore; @@ -534,7 +535,7 @@ impl InvertedIndexBuilder { pub async fn remap( &mut self, - mapping: &HashMap>, + mapping: &RowAddrRemap, src_store: Arc, dest_store: &dyn IndexStore, ) -> Result> { @@ -846,7 +847,7 @@ impl InnerBuilder { self.posting_lists = posting_lists; } - pub async fn remap(&mut self, mapping: &HashMap>) -> Result<()> { + pub async fn remap(&mut self, mapping: &RowAddrRemap) -> Result<()> { // for the docs, we need to remove the rows that are removed from the doc set, // and update the row ids of the rows that are updated let removed = self.docs.remap(mapping); @@ -885,7 +886,7 @@ impl InnerBuilder { mapping.insert(*row_id, None); } } - self.remap(&mapping).await + self.remap(&RowAddrRemap::Explicit(mapping)).await } pub fn merge_from(&mut self, other: Self) -> Result<()> { @@ -3338,7 +3339,10 @@ mod tests { // Remap the index via the ScalarIndex trait method use crate::scalar::ScalarIndex; let mapping = HashMap::from([(0u64, Some(50 << 32))]); - index.remap(&mapping, dest_store.as_ref()).await.unwrap(); + index + .remap(&RowAddrRemap::Explicit(mapping), dest_store.as_ref()) + .await + .unwrap(); // Reload from dest and verify deleted fragments are preserved let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()) diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 56547c6510b..94d820d5377 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::fmt::{Debug, Display}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; @@ -1150,7 +1151,7 @@ impl ScalarIndex for InvertedIndex { async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { let files = self @@ -4799,16 +4800,16 @@ impl DocSet { // remap the row ids to the new row ids // returns the removed doc ids - pub fn remap(&mut self, mapping: &HashMap>) -> Vec { + pub fn remap(&mut self, mapping: &RowAddrRemap) -> Vec { let mut removed = Vec::new(); let len = self.len(); let row_ids = std::mem::replace(&mut self.row_ids, Vec::with_capacity(len)); let num_tokens = std::mem::replace(&mut self.num_tokens, Vec::with_capacity(len)); self.total_tokens = 0; for (doc_id, (row_id, num_token)) in std::iter::zip(row_ids, num_tokens).enumerate() { - match mapping.get(&row_id) { + match mapping.get(row_id) { Some(Some(new_row_id)) => { - self.row_ids.push(*new_row_id); + self.row_ids.push(new_row_id); self.num_tokens.push(num_token); self.total_tokens += num_token as u64; } @@ -5848,7 +5849,10 @@ mod tests { let mut builder = index.into_builder().await.unwrap(); let mapping = HashMap::from([(0, None), (2, Some(3))]); - builder.remap(&mapping).await.unwrap(); + builder + .remap(&RowAddrRemap::Explicit(mapping)) + .await + .unwrap(); // after remap, the doc 0 is removed, and the doc 2 is updated to 3 assert_eq!(builder.tokens.len(), 1); @@ -5863,7 +5867,10 @@ mod tests { // remap to delete all docs let mapping = HashMap::from([(1, None), (3, None)]); - builder.remap(&mapping).await.unwrap(); + builder + .remap(&RowAddrRemap::Explicit(mapping)) + .await + .unwrap(); assert_eq!(builder.tokens.len(), 0); assert_eq!(builder.posting_lists.len(), 0); diff --git a/rust/lance-index/src/scalar/json.rs b/rust/lance-index/src/scalar/json.rs index 7adf055db61..8723de9dfb8 100644 --- a/rust/lance-index/src/scalar/json.rs +++ b/rust/lance-index/src/scalar/json.rs @@ -1,8 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ - collections::HashMap, ops::Bound, sync::{Arc, Mutex}, }; @@ -118,7 +118,7 @@ impl ScalarIndex for JsonIndex { async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { let target_created = self.target_index.remap(mapping, dest_store).await?; diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index cf357d89585..0cf92a6d8d1 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ any::Any, collections::HashMap, @@ -215,7 +216,7 @@ impl ScalarIndex for LabelListIndex { /// Remap the row ids, creating a new remapped version of this index in `dest_store` async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { let state = self.values_index.load_bitmap_index_state().await?; @@ -223,10 +224,7 @@ impl ScalarIndex for LabelListIndex { let remapped_nulls = RowAddrTreeMap::from_iter(self.list_nulls.row_addrs().unwrap().filter_map(|addr| { let addr_as_u64 = u64::from(addr); - mapping - .get(&addr_as_u64) - .copied() - .unwrap_or(Some(addr_as_u64)) + mapping.get(addr_as_u64).unwrap_or(Some(addr_as_u64)) })); let file = write_label_list_bitmap_index( remapped_state, diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 562945b8f0d..55392c0e3df 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -543,6 +543,7 @@ mod tests { use datafusion_common::ScalarValue; use futures::FutureExt; use lance_core::ROW_ID; + use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::utils::tempfile::TempDir; use lance_datagen::{ArrayGeneratorExt, BatchCount, ByteCount, RowCount, array, gen_batch}; use lance_select::{RowAddrTreeMap, RowSetOps}; @@ -1619,7 +1620,7 @@ mod tests { let remapped_dir = TempDir::default(); let remapped_store = test_store(&remapped_dir); index - .remap(&mapping, remapped_store.as_ref()) + .remap(&RowAddrRemap::Explicit(mapping), remapped_store.as_ref()) .await .unwrap(); let remapped_index = BitmapIndex::load(remapped_store, None, &LanceCache::no_cache()) diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 72ef8d53a92..794a37ce3e3 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::any::Any; use std::collections::BTreeMap; use std::iter::once; @@ -327,11 +328,7 @@ impl NGramIndex { }) } - fn remap_batch( - &self, - batch: RecordBatch, - mapping: &HashMap>, - ) -> Result { + fn remap_batch(&self, batch: RecordBatch, mapping: &RowAddrRemap) -> Result { let posting_lists_array = batch .column_by_name(POSTING_LIST_COL) .expect_ok()? @@ -344,8 +341,8 @@ impl NGramIndex { let posting_list = RoaringTreemap::deserialize_from(posting_list)?; let new_posting_list = RoaringTreemap::from_iter(posting_list.into_iter().filter_map(|row_id| { - match mapping.get(&row_id) { - Some(Some(new_row_id)) => Some(*new_row_id), + match mapping.get(row_id) { + Some(Some(new_row_id)) => Some(new_row_id), Some(None) => None, None => Some(row_id), } @@ -485,7 +482,7 @@ impl ScalarIndex for NGramIndex { async fn remap( &self, - mapping: &HashMap>, + mapping: &RowAddrRemap, dest_store: &dyn IndexStore, ) -> Result { let reader = self.store.open_index_file(POSTINGS_FILENAME).await?; @@ -1319,6 +1316,8 @@ impl ScalarIndexPlugin for NGramIndexPlugin { #[cfg(test)] mod tests { + use lance_core::utils::row_addr_remap::RowAddrRemap; + use rstest::rstest; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -1649,7 +1648,10 @@ mod tests { )); let remapping = HashMap::from([(2, Some(100)), (3, None), (4, Some(101))]); - index.remap(&remapping, test_store.as_ref()).await.unwrap(); + index + .remap(&RowAddrRemap::Explicit(remapping), test_store.as_ref()) + .await + .unwrap(); let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) .await @@ -1661,6 +1663,61 @@ mod tests { assert_eq!(null_posting_list, vec![100]); } + // Like `test_ngram_index_remap` but covering both RowAddrRemap modes: rows + // 0..4 of frag 0 are rewritten into frag 10; row 4 is deleted. + fn ngram_remap_compact() -> RowAddrRemap { + use lance_core::utils::row_addr_remap::GroupInput; + use roaring::RoaringTreemap; + RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter(0u64..4), + old_frag_ids: vec![0], + new_frags: vec![(10, 4)], + }]) + .unwrap() + } + + fn ngram_remap_explicit() -> RowAddrRemap { + RowAddrRemap::Explicit( + (0u64..4) + .map(|i| (i, Some((10u64 << 32) | i))) + .chain(std::iter::once((4u64, None))) + .collect(), + ) + } + + #[rstest] + #[case(ngram_remap_compact())] + #[case(ngram_remap_explicit())] + #[tokio::test] + async fn test_ngram_index_remap_compact(#[case] remap: RowAddrRemap) { + let data = simple_data_with_nulls(); + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default()).unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + + let row_ids = row_ids_in_index(&index).await; + assert_eq!(row_ids, vec![0, 1, 2, 3, 4]); + + let new_tmpdir = Arc::new(TempDir::default()); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + new_tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + index.remap(&remap, test_store.as_ref()).await.unwrap(); + + let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) + .await + .unwrap(); + let addr = |offset: u64| (10u64 << 32) | offset; + let row_ids = row_ids_in_index(&index).await; + assert_eq!(row_ids, vec![addr(0), addr(1), addr(2), addr(3)]); + + // rows 2 and 3 are the null docs; both are rewritten into frag 10. + let null_posting_list = get_null_posting_list(&index).await; + assert_eq!(null_posting_list, vec![addr(2), addr(3)]); + } + #[test_log::test(tokio::test)] async fn test_ngram_index_merge() { let data = simple_data_with_nulls(); diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index 5d5ac2a3a92..a29fd59a098 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -34,6 +34,7 @@ use lance_arrow::RecordBatchExt; use lance_core::cache::{CacheKey, LanceCache, WeakLanceCache}; use lance_core::deepsize::DeepSizeOf; use lance_core::utils::address::RowAddress; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::utils::tempfile::TempDir; use lance_core::{Error, ROW_ID, Result}; use lance_datafusion::chunker::chunk_concat_stream; @@ -551,7 +552,7 @@ impl ScalarIndex for RTreeIndex { async fn remap( &self, - _mapping: &HashMap>, + _mapping: &RowAddrRemap, _dest_store: &dyn IndexStore, ) -> Result { Err(Error::invalid_input_source( diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 9f2228740c2..431506c2306 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -24,6 +24,7 @@ use crate::scalar::{ }; use lance_arrow_stats::StatisticsAccumulator; use lance_core::cache::{LanceCache, WeakLanceCache}; +use lance_core::utils::row_addr_remap::RowAddrRemap; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; @@ -33,7 +34,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field}; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::ScalarValue; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult}; use crate::scalar::FragReuseIndex; @@ -611,7 +612,7 @@ impl ScalarIndex for ZoneMapIndex { /// Remap the row ids, creating a new remapped version of this index in `dest_store` async fn remap( &self, - _mapping: &HashMap>, + _mapping: &RowAddrRemap, _dest_store: &dyn IndexStore, ) -> Result { Err(Error::invalid_input_source( diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index 3c5a6601a8a..76b9ccc7a51 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -4,9 +4,10 @@ //! Vector Index //! +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::any::Any; use std::fmt::Debug; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use arrow_array::{ArrayRef, Float32Array, RecordBatch, UInt32Array}; use arrow_schema::Field; @@ -408,7 +409,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index { /// /// If an old row id is not in the mapping then it should be /// left alone. - async fn remap(&mut self, mapping: &HashMap>) -> Result<()>; + async fn remap(&mut self, mapping: &RowAddrRemap) -> Result<()>; /// The metric type of this vector index. fn metric_type(&self) -> DistanceType; diff --git a/rust/lance-index/src/vector/bq/storage.rs b/rust/lance-index/src/vector/bq/storage.rs index bd70f176c5d..41b273f54a3 100644 --- a/rust/lance-index/src/vector/bq/storage.rs +++ b/rust/lance-index/src/vector/bq/storage.rs @@ -1,8 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::borrow::Cow; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::BinaryHeap; use std::ops::Sub; use std::sync::{ Arc, OnceLock, @@ -2304,7 +2305,7 @@ impl QuantizerStorage for RabitQuantizationStorage { Self::try_from_batch(batch, metadata, distance_type, frag_reuse_index) } - fn remap(&self, mapping: &HashMap>) -> Result { + fn remap(&self, mapping: &RowAddrRemap) -> Result { let num_vectors = self.codes.len(); let num_code_bytes = self.codes.value_length() as usize; let codes = self.codes.values().as_primitive::().values(); @@ -2314,10 +2315,10 @@ impl QuantizerStorage for RabitQuantizationStorage { let row_ids = self.row_ids.values(); for (i, row_id) in row_ids.iter().enumerate() { - match mapping.get(row_id) { + match mapping.get(*row_id) { Some(Some(new_id)) => { indices.push(i as u32); - new_row_ids.push(*new_id); + new_row_ids.push(new_id); new_codes.extend(get_rq_code(codes, i, num_vectors, num_code_bytes)); } Some(None) => {} @@ -2485,6 +2486,7 @@ fn get_rq_code( #[cfg(test)] mod tests { use super::*; + use rstest::rstest; use std::collections::{BinaryHeap, HashMap}; use arrow_array::{ArrayRef, Float32Array, Float64Array, UInt64Array}; @@ -3552,7 +3554,7 @@ mod tests { mapping.insert(3, None); mapping.insert(4, Some(104)); - let remapped = storage.remap(&mapping).unwrap(); + let remapped = storage.remap(&RowAddrRemap::Explicit(mapping)).unwrap(); assert!(remapped.metadata().packed); let remapped_batch = remapped.to_batches().unwrap().next().unwrap(); @@ -3569,6 +3571,51 @@ mod tests { assert_codes_eq(remapped_codes, &repacked); } + // Rows 0..25 of frag 0 are rewritten in order into frag 1; rows 25..50 are + // deleted. remap must behave the same in either RowAddrRemap mode. + fn rq_remap_compact() -> RowAddrRemap { + use lance_core::utils::row_addr_remap::GroupInput; + use roaring::RoaringTreemap; + RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter(0u64..25), + old_frag_ids: vec![0], + new_frags: vec![(1, 25)], + }]) + .unwrap() + } + + fn rq_remap_explicit() -> RowAddrRemap { + RowAddrRemap::Explicit( + (0u64..25) + .map(|i| (i, Some((1u64 << 32) | i))) + .chain((25u64..50).map(|i| (i, None))) + .collect(), + ) + } + + #[rstest] + #[case(rq_remap_compact())] + #[case(rq_remap_explicit())] + fn test_remap_compact_rewrites_old_row_addrs(#[case] remap: RowAddrRemap) { + let original_codes = make_test_codes(50, 64); + let metadata = make_test_metadata(original_codes.value_length() as usize * 8); + let storage = RabitQuantizationStorage::try_from_batch( + make_test_batch(original_codes), + &metadata, + DistanceType::L2, + None, + ) + .unwrap(); + + let remapped = storage.remap(&remap).unwrap(); + let remapped_batch = remapped.to_batches().unwrap().next().unwrap(); + let remapped_row_ids = remapped_batch[ROW_ID].as_primitive::().values(); + // Rewritten rows 0..25 land at frag 1 offsets 0..25; the rest are dropped. + let expected_row_ids = + UInt64Array::from_iter_values((0..25).map(|i| (1u64 << 32) | i as u64)); + assert_eq!(remapped_row_ids, expected_row_ids.values()); + } + #[test] fn test_remap_preserves_multi_bit_rq_split_columns() { let original_codes = make_test_codes(50, 64); @@ -3589,7 +3636,7 @@ mod tests { mapping.insert(3, None); mapping.insert(4, Some(104)); - let remapped = storage.remap(&mapping).unwrap(); + let remapped = storage.remap(&RowAddrRemap::Explicit(mapping)).unwrap(); let remapped_batch = remapped.to_batches().unwrap().next().unwrap(); let remapped_row_ids = remapped_batch[ROW_ID].as_primitive::().values(); let expected_row_ids = UInt64Array::from_iter_values( diff --git a/rust/lance-index/src/vector/flat/index.rs b/rust/lance-index/src/vector/flat/index.rs index 2c099eb5435..cc6f6d021eb 100644 --- a/rust/lance-index/src/vector/flat/index.rs +++ b/rust/lance-index/src/vector/flat/index.rs @@ -4,7 +4,8 @@ //! Flat Vector Index. //! -use std::collections::{BinaryHeap, HashMap}; +use lance_core::utils::row_addr_remap::RowAddrRemap; +use std::collections::BinaryHeap; use std::sync::Arc; use arrow::array::AsArray; @@ -312,7 +313,7 @@ impl IvfSubIndex for FlatIndex { Ok(Self {}) } - fn remap(&self, _: &HashMap>, _: &impl VectorStore) -> Result { + fn remap(&self, _: &RowAddrRemap, _: &impl VectorStore) -> Result { Ok(self.clone()) } diff --git a/rust/lance-index/src/vector/hnsw/builder.rs b/rust/lance-index/src/vector/hnsw/builder.rs index 214750dfafa..1712e1326f3 100644 --- a/rust/lance-index/src/vector/hnsw/builder.rs +++ b/rust/lance-index/src/vector/hnsw/builder.rs @@ -10,6 +10,7 @@ use arrow_array::{ArrayRef, Float32Array, ListArray, RecordBatch, UInt64Array}; use crossbeam_queue::ArrayQueue; use itertools::Itertools; use lance_core::deepsize::DeepSizeOf; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_linalg::distance::DistanceType; @@ -1222,7 +1223,7 @@ impl IvfSubIndex for HNSW { fn remap( &self, - _mapping: &HashMap>, // we don't need the mapping here because we rebuild the graph from remapped storage + _mapping: &RowAddrRemap, // we don't need the mapping here because we rebuild the graph from remapped storage store: &impl VectorStore, ) -> Result { // We can't simply remap the row ids in the graph because the vectors are changed, diff --git a/rust/lance-index/src/vector/hnsw/index.rs b/rust/lance-index/src/vector/hnsw/index.rs index 0ae42f59414..6df1f88cc17 100644 --- a/rust/lance-index/src/vector/hnsw/index.rs +++ b/rust/lance-index/src/vector/hnsw/index.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ any::Any, - collections::HashMap, fmt::{Debug, Formatter}, sync::Arc, }; @@ -312,7 +312,7 @@ impl VectorIndex for HNSWIndex { Box::new(self.storage.as_ref().unwrap().row_ids()) } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { Err(Error::index( "Remapping HNSW in this way not supported".to_string(), )) diff --git a/rust/lance-index/src/vector/pq/storage.rs b/rust/lance-index/src/vector/pq/storage.rs index 68747713aac..b9148b6d19f 100644 --- a/rust/lance-index/src/vector/pq/storage.rs +++ b/rust/lance-index/src/vector/pq/storage.rs @@ -5,9 +5,9 @@ //! //! Used as storage backend for Graph based algorithms. +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ cmp::min, - collections::HashMap, sync::{Arc, OnceLock}, }; @@ -545,16 +545,16 @@ impl QuantizerStorage for ProductQuantizationStorage { // we can't use the default implementation of remap, // because PQ Storage transposed the PQ codes - fn remap(&self, mapping: &HashMap>) -> Result { + fn remap(&self, mapping: &RowAddrRemap) -> Result { let transposed_codes = self.pq_code.values(); let mut new_row_ids = Vec::with_capacity(self.len()); let mut new_codes = Vec::with_capacity(self.len() * self.metadata.num_sub_vectors); let row_ids = self.row_ids.values(); for (i, row_id) in row_ids.iter().enumerate() { - match mapping.get(row_id) { + match mapping.get(*row_id) { Some(Some(new_id)) => { - new_row_ids.push(*new_id); + new_row_ids.push(new_id); new_codes.extend(get_pq_code( transposed_codes, self.metadata.nbits, @@ -1149,6 +1149,7 @@ mod tests { use lance_arrow::FixedSizeListArrayExt; use lance_core::ROW_ID_FIELD; use rand::Rng; + use rstest::rstest; const DIM: usize = 32; const TOTAL: usize = 512; @@ -1295,21 +1296,40 @@ mod tests { assert!((storage.dist_between(u, v) - expected).abs() < 1e-4); } + // The first half of the rows is rewritten in order into frag 1; the second + // half is deleted. remap must behave the same in either RowAddrRemap mode. + fn pq_remap_compact() -> RowAddrRemap { + use lance_core::utils::row_addr_remap::GroupInput; + use roaring::RoaringTreemap; + RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: RoaringTreemap::from_iter((0..TOTAL / 2).map(|i| i as u64)), + old_frag_ids: vec![0], + new_frags: vec![(1, (TOTAL / 2) as u32)], + }]) + .unwrap() + } + + fn pq_remap_explicit() -> RowAddrRemap { + RowAddrRemap::Explicit( + (0..TOTAL / 2) + .map(|i| (i as u64, Some((1u64 << 32) | i as u64))) + .chain((TOTAL / 2..TOTAL).map(|i| (i as u64, None))) + .collect(), + ) + } + + #[rstest] + #[case(pq_remap_compact())] + #[case(pq_remap_explicit())] #[tokio::test] - async fn test_remap_with_extra_column() { + async fn test_remap_with_extra_column(#[case] remap: RowAddrRemap) { let storage = create_pq_storage_with_extra_column().await; - let mut mapping = HashMap::new(); - for i in 0..TOTAL / 2 { - mapping.insert(i as u64, Some((TOTAL + i) as u64)); - } - for i in TOTAL / 2..TOTAL { - mapping.insert(i as u64, None); - } - let new_storage = storage.remap(&mapping).unwrap(); + let new_storage = storage.remap(&remap).unwrap(); assert_eq!(new_storage.len(), TOTAL / 2); assert_eq!(new_storage.row_ids.len(), TOTAL / 2); for (i, row_id) in new_storage.row_ids().enumerate() { - assert_eq!(*row_id, (TOTAL + i) as u64); + // Rewritten row i lands at offset i of frag 1. + assert_eq!(*row_id, (1u64 << 32) | i as u64); } assert_eq!(new_storage.batch.num_columns(), 2); assert!(new_storage.batch.column_by_name(ROW_ID).is_some()); diff --git a/rust/lance-index/src/vector/quantizer.rs b/rust/lance-index/src/vector/quantizer.rs index 8ee64669f32..6f4b191098b 100644 --- a/rust/lance-index/src/vector/quantizer.rs +++ b/rust/lance-index/src/vector/quantizer.rs @@ -2,9 +2,10 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use core::fmt; +use lance_core::utils::row_addr_remap::RowAddrRemap; +use std::fmt::Debug; use std::str::FromStr; use std::sync::Arc; -use std::{collections::HashMap, fmt::Debug}; use arrow::{array::AsArray, compute::concat_batches, datatypes::UInt64Type}; use arrow_array::{Array, ArrayRef, FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array}; @@ -240,7 +241,7 @@ pub trait QuantizerStorage: Clone + Sized + DeepSizeOf + VectorStore { fn metadata(&self) -> &Self::Metadata; - fn remap(&self, mapping: &HashMap>) -> Result { + fn remap(&self, mapping: &RowAddrRemap) -> Result { let batches = self .to_batches()? .map(|b| { @@ -249,10 +250,10 @@ pub trait QuantizerStorage: Clone + Sized + DeepSizeOf + VectorStore { let row_ids = b.column(0).as_primitive::().values(); for (i, row_id) in row_ids.iter().enumerate() { - match mapping.get(row_id) { + match mapping.get(*row_id) { Some(Some(new_id)) => { indices.push(i as u32); - new_row_ids.push(*new_id); + new_row_ids.push(new_id); } Some(None) => {} None => { diff --git a/rust/lance-index/src/vector/v3/subindex.rs b/rust/lance-index/src/vector/v3/subindex.rs index 9a49bc95f1d..9e82921347f 100644 --- a/rust/lance-index/src/vector/v3/subindex.rs +++ b/rust/lance-index/src/vector/v3/subindex.rs @@ -1,7 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::collections::{BinaryHeap, HashMap}; +use lance_core::utils::row_addr_remap::RowAddrRemap; +use std::collections::BinaryHeap; use std::fmt::Debug; use std::sync::Arc; @@ -106,7 +107,7 @@ pub trait IvfSubIndex: Send + Sync + Debug + DeepSizeOf { where Self: Sized; - fn remap(&self, mapping: &HashMap>, store: &impl VectorStore) -> Result + fn remap(&self, mapping: &RowAddrRemap, store: &impl VectorStore) -> Result where Self: Sized; diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 770c68b89e9..7cac7815125 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -3,7 +3,8 @@ pub mod frag_reuse; -use std::collections::{HashMap, HashSet}; +use lance_core::utils::row_addr_remap::RowAddrRemap; +use std::collections::HashSet; use std::sync::Arc; use crate::Dataset; @@ -47,7 +48,7 @@ impl DatasetIndexRemapper { async fn remap_index( &self, index: &IndexMetadata, - mapping: &HashMap>, + mapping: &RowAddrRemap, ) -> Result { remap_index(&self.dataset, &index.uuid, mapping).await } @@ -57,7 +58,7 @@ impl DatasetIndexRemapper { impl IndexRemapper for DatasetIndexRemapper { async fn remap_indices( &self, - mapping: HashMap>, + mapping: RowAddrRemap, affected_fragment_ids: &[u64], ) -> Result> { let affected_frag_ids = HashSet::::from_iter(affected_fragment_ids.iter().copied()); @@ -175,8 +176,6 @@ impl LanceIndexStoreExt for LanceIndexStore { #[cfg(test)] mod tests { - use std::collections::HashMap; - use super::*; use crate::dataset::WriteParams; use crate::index::DatasetIndexExt; @@ -299,7 +298,7 @@ mod tests { .create_remapper(&dataset) .unwrap(); let remapped = remapper - .remap_indices(HashMap::new(), &[target_fragments[0].id() as u64]) + .remap_indices(RowAddrRemap::empty(), &[target_fragments[0].id() as u64]) .await .unwrap(); diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index d591e42cc73..d90b167a08b 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -80,6 +80,7 @@ //! the successful tasks can be committed. You can also commit in batches if //! you wish. As long as the tasks don't rewrite any of the same fragments, //! they can be committed in any order. +use lance_core::utils::row_addr_remap::{GroupInput, RowAddrRemap}; use std::borrow::Cow; use std::collections::HashMap; use std::io::Cursor; @@ -1927,7 +1928,7 @@ pub async fn commit_compaction( let mut rewrite_groups = Vec::with_capacity(completed_tasks.len()); let mut metrics = CompactionMetrics::default(); - let mut row_id_map: HashMap> = HashMap::default(); + let mut remap_group_inputs: Vec = Vec::new(); let mut frag_reuse_groups: Vec = Vec::new(); let mut new_fragment_bitmap: RoaringBitmap = RoaringBitmap::new(); @@ -1942,12 +1943,41 @@ pub async fn commit_compaction( if let Some(row_addrs_bytes) = task.row_addrs { let row_addrs = RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?; - let transposed = remapping::transpose_row_addrs( - row_addrs, - &task.original_fragments, - &task.new_fragments, - ); - row_id_map.extend(transposed); + let new_frags = task + .new_fragments + .iter() + .map(|f| { + let physical_rows = f.physical_rows.ok_or_else(|| { + Error::invalid_input(format!( + "compacted fragment {} is missing physical_rows", + f.id + )) + })?; + Ok((f.id as u32, physical_rows as u32)) + }) + .collect::>>()?; + + let new_rows: u64 = new_frags.iter().map(|(_, rows)| *rows as u64).sum(); + if row_addrs.len() != new_rows { + return Err(Error::invalid_input(format!( + "compaction task rewrote fragments {:?} into {} rows but recorded {} rewritten old row addresses", + task.original_fragments + .iter() + .map(|f| f.id) + .collect::>(), + new_rows, + row_addrs.len() + ))); + } + remap_group_inputs.push(GroupInput { + rewritten_old_row_addrs: row_addrs, + old_frag_ids: task + .original_fragments + .iter() + .map(|f| f.id as u32) + .collect(), + new_frags, + }); } } else if options.defer_index_remap { let changed_row_addrs = task.row_addrs.ok_or_else(|| { @@ -1976,7 +2006,7 @@ pub async fn commit_compaction( .collect::>(); let remapped_indices = index_remapper - .remap_indices(row_id_map, &affected_ids) + .remap_indices(RowAddrRemap::compact(remap_group_inputs)?, &affected_ids) .await?; remapped_indices .into_iter() @@ -2201,18 +2231,26 @@ mod tests { impl IndexRemapper for MockIndexRemapper { async fn remap_indices( &self, - index_map: HashMap>, + index_map: RowAddrRemap, _: &[u64], ) -> Result> { for expectation in &self.expectations { - if expectation.expected == index_map { + let expected_frags: RoaringBitmap = expectation + .expected + .keys() + .map(|addr| (addr >> 32) as u32) + .collect(); + if index_map.affected_fragments() == expected_frags + && expectation + .expected + .iter() + .all(|(k, v)| index_map.get(*k) == Some(*v)) + { return Ok(expectation.answer.clone()); } } panic!( - "Unexpected index map (len={}): {}\n Options: {}", - index_map.len(), - Self::stringify_map(&index_map), + "Unexpected index map; expected one of:\n {}", self.expectations .iter() .map(|expectation| Self::stringify_map(&expectation.expected)) @@ -2697,11 +2735,7 @@ mod tests { #[async_trait] impl IndexRemapper for IgnoreRemap { - async fn remap_indices( - &self, - _: HashMap>, - _: &[u64], - ) -> Result> { + async fn remap_indices(&self, _: RowAddrRemap, _: &[u64]) -> Result> { Ok(Vec::new()) } } diff --git a/rust/lance/src/dataset/optimize/remapping.rs b/rust/lance/src/dataset/optimize/remapping.rs index dab62bf6166..0a9bf0d784f 100644 --- a/rust/lance/src/dataset/optimize/remapping.rs +++ b/rust/lance/src/dataset/optimize/remapping.rs @@ -12,6 +12,7 @@ use crate::{Dataset, index}; use async_trait::async_trait; use lance_core::Error; use lance_core::utils::address::RowAddress; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_index::frag_reuse::{FRAG_REUSE_INDEX_NAME, FragDigest}; use lance_table::format::{Fragment, IndexFile, IndexMetadata}; use lance_table::io::manifest::read_manifest_indexes; @@ -51,7 +52,7 @@ pub struct RemappedIndex { pub trait IndexRemapper: Send + Sync { async fn remap_indices( &self, - index_map: HashMap>, + index_map: RowAddrRemap, affected_fragment_ids: &[u64], ) -> Result>; } @@ -69,11 +70,7 @@ pub struct IgnoreRemap {} #[async_trait] impl IndexRemapper for IgnoreRemap { - async fn remap_indices( - &self, - _: HashMap>, - _: &[u64], - ) -> Result> { + async fn remap_indices(&self, _: RowAddrRemap, _: &[u64]) -> Result> { Ok(Vec::new()) } } @@ -222,7 +219,7 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { // Sequentially apply the row addr maps from oldest to latest let mut curr_index_id = *index_id; - for (i, row_id_map) in frag_reuse_index.row_id_maps.iter().enumerate() { + for (i, row_id_map) in frag_reuse_index.row_id_maps.into_iter().enumerate() { let version = &frag_reuse_index.details.versions[i]; // load on-disk index metadata before auto-remap let curr_index_meta = read_manifest_indexes( @@ -271,7 +268,8 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { }; if should_remap { - let remap_result = index::remap_index(dataset, &curr_index_id, row_id_map).await?; + let remapper = RowAddrRemap::Explicit(row_id_map); + let remap_result = index::remap_index(dataset, &curr_index_id, &remapper).await?; let new_index_meta = match remap_result { RemapResult::Drop => continue, @@ -369,6 +367,91 @@ pub async fn remap_column_index( mod tests { use super::*; + /// The compact remap must agree with the materialized + /// `transpose_row_ids_from_digest` (the defer-path implementation) on every + /// real address — cross-checking rank/select against an independent map so a + /// shared mistake between the implementation and its unit test can't hide. + #[test] + fn test_compact_matches_transpose() { + use lance_core::utils::row_addr_remap::GroupInput; + // Ascending old fragments (compaction's scan order), with deletions. + let old = vec![ + FragDigest { + id: 0, + physical_rows: 5, + num_deleted_rows: 2, + }, + FragDigest { + id: 1, + physical_rows: 4, + num_deleted_rows: 1, + }, + FragDigest { + id: 3, + physical_rows: 3, + num_deleted_rows: 0, + }, + ]; + // 9 rewritten rows (offsets that survived in each old fragment). + let rewritten = [ + (0, 1), + (0, 2), + (0, 4), + (1, 0), + (1, 1), + (1, 3), + (3, 0), + (3, 1), + (3, 2), + ]; + let addrs = RoaringTreemap::from_iter( + rewritten + .iter() + .map(|(f, o)| u64::from(RowAddress::new_from_parts(*f, *o))), + ); + // 9 rewritten rows split across two new fragments. + let new = vec![ + FragDigest { + id: 10, + physical_rows: 4, + num_deleted_rows: 0, + }, + FragDigest { + id: 11, + physical_rows: 5, + num_deleted_rows: 0, + }, + ]; + + let expected = transpose_row_ids_from_digest(addrs.clone(), &old, &new); + let compact = RowAddrRemap::compact([GroupInput { + rewritten_old_row_addrs: addrs, + old_frag_ids: old.iter().map(|f| f.id as u32).collect(), + new_frags: new + .iter() + .map(|f| (f.id as u32, f.physical_rows as u32)) + .collect(), + }]) + .unwrap(); + + // Every real address in the old fragments must map identically. + for f in &old { + for o in 0..f.physical_rows as u32 { + let a = u64::from(RowAddress::new_from_parts(f.id as u32, o)); + assert_eq!( + compact.get(a), + expected.get(&a).copied(), + "mismatch at ({}, {})", + f.id, + o + ); + } + } + // A fragment outside the group is unaffected by both. + let outside = u64::from(RowAddress::new_from_parts(99, 0)); + assert_eq!(compact.get(outside), expected.get(&outside).copied()); + } + #[test] fn test_missing_indices() { // Sanity test to make sure MissingIds works. Does not test actual functionality so diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 78e5c429527..64455185ba4 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -4,6 +4,7 @@ //! Secondary Index //! +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, OnceLock}; @@ -15,7 +16,6 @@ use itertools::Itertools; use lance_core::cache::CacheKey; use lance_core::datatypes::Field; use lance_core::datatypes::Schema as LanceSchema; -use lance_core::utils::address::RowAddress; use lance_core::utils::parse::parse_env_as_bool; use lance_core::utils::tracing::{ IO_TYPE_OPEN_FRAG_REUSE, IO_TYPE_OPEN_MEM_WAL, IO_TYPE_OPEN_VECTOR, TRACE_IO_EVENTS, @@ -471,7 +471,7 @@ pub trait IndexBuilder { pub(crate) async fn remap_index( dataset: &Dataset, index_id: &Uuid, - row_id_map: &HashMap>, + row_id_map: &RowAddrRemap, ) -> Result { // Load indices from the disk. let indices = dataset.load_indices().await?; @@ -486,20 +486,14 @@ pub(crate) async fn remap_index( )); } - if row_id_map.values().all(|v| v.is_none()) { - let deleted_bitmap = RoaringBitmap::from_iter( - row_id_map - .keys() - .map(|row_id| RowAddress::new_from_u64(*row_id)) - .map(|addr| addr.fragment_id()), - ); - if Some(deleted_bitmap) == matched.fragment_bitmap { - // If remap deleted all rows, we can just return the same index ID. - // This can happen if there is a bug where the index is covering empty - // fragment that haven't been cleaned up. They should be cleaned up - // outside of this function. - return Ok(RemapResult::Keep(*index_id)); - } + if let Some(deleted_bitmap) = row_id_map.fully_deleted_fragments() + && Some(deleted_bitmap) == matched.fragment_bitmap + { + // If remap deleted all rows, we can just return the same index ID. + // This can happen if there is a bug where the index is covering empty + // fragment that haven't been cleaned up. They should be cleaned up + // outside of this function. + return Ok(RemapResult::Keep(*index_id)); } let field_id = matched @@ -4099,9 +4093,13 @@ mod tests { let remap_to_empty = (0..dataset.count_all_rows().await.unwrap()) .map(|i| (i as u64, None)) .collect::>(); - let new_uuid = remap_index(&dataset, &index_uuid, &remap_to_empty) - .await - .unwrap(); + let new_uuid = remap_index( + &dataset, + &index_uuid, + &RowAddrRemap::Explicit(remap_to_empty), + ) + .await + .unwrap(); assert_eq!(new_uuid, RemapResult::Keep(index_uuid)); } diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index 75465cc817c..647eef74809 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -3,6 +3,7 @@ //! Query-time logical views over scalar index segments. +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::any::Any; use std::sync::Arc; @@ -138,7 +139,7 @@ impl ScalarIndex for LogicalScalarIndex { async fn remap( &self, - _mapping: &std::collections::HashMap>, + _mapping: &RowAddrRemap, _dest_store: &dyn lance_index::scalar::IndexStore, ) -> Result { Err(Error::invalid_input(format!( diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 0eb66ea2ede..cb6c76418c8 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -4,6 +4,7 @@ //! Vector Index for Fast Approximate Nearest Neighbor (ANN) Search //! +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::sync::Arc; use std::{any::Any, collections::HashMap}; @@ -1486,14 +1487,14 @@ pub(crate) async fn build_empty_vector_index( )) } -#[instrument(level = "debug", skip_all, fields(old_uuid = old_uuid.to_string(), new_uuid = new_uuid.to_string(), num_rows = mapping.len()))] +#[instrument(level = "debug", skip_all, fields(old_uuid = old_uuid.to_string(), new_uuid = new_uuid.to_string()))] pub(crate) async fn remap_vector_index( dataset: Arc, column: &str, old_uuid: &Uuid, new_uuid: &Uuid, old_metadata: &IndexMetadata, - mapping: &HashMap>, + mapping: &RowAddrRemap, ) -> Result> { let old_index = dataset .open_vector_index(column, old_uuid, &NoOpMetricsCollector) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 1e4fec8c762..bf968d9744f 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::collections::HashSet; use std::sync::{Arc, Mutex}; use std::{collections::HashMap, pin::Pin}; @@ -332,7 +333,7 @@ impl IvfIndexBuilder }) } - pub async fn remap(&mut self, mapping: &HashMap>) -> Result> { + pub async fn remap(&mut self, mapping: &RowAddrRemap) -> Result> { if self.existing_indices.is_empty() { return Err(Error::invalid_input( "No existing indices available for remapping", diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index 91d5c434dd1..5aed016f477 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -5,10 +5,10 @@ #[cfg(test)] mod test { + use lance_core::utils::row_addr_remap::RowAddrRemap; use std::{ any::Any, cell::OnceCell, - collections::HashMap, sync::{Arc, Mutex}, }; @@ -153,7 +153,7 @@ mod test { todo!("this method is for only IVF_HNSW_* index"); } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { Ok(()) } diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index fb01339ead9..8796552c7a2 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -44,6 +44,7 @@ use futures::{ use io::write_hnsw_quantization_index_partitions; use lance_arrow::*; use lance_core::deepsize::DeepSizeOf; +use lance_core::utils::row_addr_remap::RowAddrRemap; use lance_core::{ Error, ROW_ID_FIELD, Result, cache::{LanceCache, UnsizedCacheKey, WeakLanceCache}, @@ -1248,7 +1249,7 @@ impl VectorIndex for IVFIndex { todo!("this method is for only IVF_HNSW_* index"); } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { // This will be needed if we want to clean up IVF to allow more than just // one layer (e.g. IVF -> IVF -> PQ). We need to pass on the call to // remap to the lower layers. @@ -1723,7 +1724,7 @@ impl RemapPageTask { mut self, reader: Arc, index: &IVFIndex, - mapping: &HashMap>, + mapping: &RowAddrRemap, ) -> Result { let mut page = index .sub_index @@ -1769,7 +1770,7 @@ pub(crate) async fn remap_index_file_v3( dataset: &Dataset, new_uuid: &Uuid, index: Arc, - mapping: &HashMap>, + mapping: &RowAddrRemap, column: String, ) -> Result> { let dataset = dataset.clone(); @@ -1868,7 +1869,7 @@ pub(crate) async fn remap_index_file( new_uuid: &Uuid, old_version: u64, index: &IVFIndex, - mapping: &HashMap>, + mapping: &RowAddrRemap, name: String, column: String, transforms: Vec, @@ -5147,7 +5148,7 @@ mod tests { &new_uuid, dataset_mut.version().version, ivf_index, - &mapping, + &RowAddrRemap::Explicit(mapping), INDEX_NAME.to_string(), WellKnownIvfPqData::COLUMN.to_string(), vec![], diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 29d9e224970..dda55a9b59c 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -3,6 +3,7 @@ //! IVF - Inverted File index. +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::io::Write as IoWrite; use std::marker::PhantomData; use std::{ @@ -1812,7 +1813,7 @@ impl VectorIndex for IVFInd todo!("this method is for only IVF_HNSW_* index"); } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { Err(Error::index( "Remapping IVF in this way not supported".to_string(), )) diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index a661a314b4d..389ec56c05a 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -1,8 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use lance_core::utils::row_addr_remap::RowAddrRemap; +use std::any::Any; use std::sync::Arc; -use std::{any::Any, collections::HashMap}; use arrow::compute::concat; use arrow_array::types::UInt64Type; @@ -442,14 +443,14 @@ impl VectorIndex for PQIndex { todo!("this method is for only IVF_HNSW_* index"); } - async fn remap(&mut self, mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, mapping: &RowAddrRemap) -> Result<()> { let num_vectors = self.row_ids.as_ref().unwrap().len(); let row_ids = self.row_ids.as_ref().unwrap().values().iter(); let transposed_codes = self.code.as_ref().unwrap(); let remapped = row_ids .enumerate() .filter_map(|(vec_idx, old_row_id)| { - let new_row_id = mapping.get(old_row_id).cloned(); + let new_row_id = mapping.get(*old_row_id); // If the row id is not in the mapping then this row is not remapped and we keep as is let new_row_id = new_row_id.unwrap_or(Some(*old_row_id)); new_row_id.map(|new_row_id| { @@ -648,6 +649,7 @@ pub(crate) fn build_pq_storage( mod tests { use super::*; + use std::collections::HashMap; use std::{ops::Range, sync::Mutex}; use arrow::datatypes::Float32Type; diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 0ceddf7c5ee..bb9b35c2c86 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +#[cfg(test)] +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::any::Any; use std::cmp::Ordering as CmpOrdering; use std::collections::{BinaryHeap, HashMap, HashSet}; @@ -2100,7 +2102,7 @@ mod tests { Box::new(self.row_ids.iter()) } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { Ok(()) } @@ -2322,7 +2324,7 @@ mod tests { Box::new(self.row_ids.iter()) } - async fn remap(&mut self, _mapping: &HashMap>) -> Result<()> { + async fn remap(&mut self, _mapping: &RowAddrRemap) -> Result<()> { Ok(()) } diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index de9e61b5f8f..d76eda1002d 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +#[cfg(test)] +use lance_core::utils::row_addr_remap::RowAddrRemap; use std::sync::Arc; use lance_core::Result; @@ -62,7 +64,6 @@ mod test { use std::{ any::Any, - collections::HashMap, sync::{Arc, atomic::AtomicBool}, }; @@ -186,7 +187,7 @@ mod test { unimplemented!() } - async fn remap(&mut self, _: &HashMap>) -> Result<()> { + async fn remap(&mut self, _: &RowAddrRemap) -> Result<()> { Ok(()) }