diff --git a/rust/lance-index/src/vector/bq/storage.rs b/rust/lance-index/src/vector/bq/storage.rs index bd70f176c5d..6e92b2e8903 100644 --- a/rust/lance-index/src/vector/bq/storage.rs +++ b/rust/lance-index/src/vector/bq/storage.rs @@ -2155,6 +2155,38 @@ pub fn unpack_codes(codes: &FixedSizeListArray) -> FixedSizeListArray { FixedSizeListArray::try_new_from_values(UInt8Array::from(unpacked), code_len as i32).unwrap() } +/// Build a row-id remapping for the rows present in this partition from a +/// fragment-reuse index, mirroring the PQ storage frag-reuse path. +/// +/// Returns `None` when there is nothing to do (no fragment-reuse index, or the +/// index leaves every present row id unchanged), so callers keep the zero-cost +/// no-op path. Otherwise, returns a `HashMap` mapping every affected old row id +/// to `Some(new_id)` for surviving rows or `None` for rows whose covering +/// fragment was compacted away, suitable for `RabitQuantizationStorage::remap`. +fn build_frag_reuse_mapping( + fri: Option<&FragReuseIndex>, + row_ids: &UInt64Array, +) -> Option>> { + let fri = fri?; + if fri.row_id_maps.is_empty() { + return None; + } + let mut mapping: HashMap> = HashMap::new(); + for row_id in row_ids.values().iter() { + match fri.remap_row_id(*row_id) { + Some(new_id) if new_id == *row_id => {} + mapped => { + mapping.insert(*row_id, mapped); + } + } + } + if mapping.is_empty() { + None + } else { + Some(mapping) + } +} + #[async_trait] impl QuantizerStorage for RabitQuantizationStorage { type Metadata = RabitQuantizationMetadata; @@ -2163,7 +2195,7 @@ impl QuantizerStorage for RabitQuantizationStorage { batch: RecordBatch, metadata: &Self::Metadata, distance_type: DistanceType, - _fri: Option>, + fri: Option>, ) -> Result { let distance_type = match (metadata.query_estimator, distance_type) { (RabitQueryEstimator::RawQuery, DistanceType::Cosine) => DistanceType::L2, @@ -2272,7 +2304,7 @@ impl QuantizerStorage for RabitQuantizationStorage { metadata.packed = true; let packed_ex_codes = maybe_pack_ex_codes(ex_codes.as_ref(), ex_bits); - Ok(Self { + let storage = Self { metadata, batch, distance_type, @@ -2285,7 +2317,12 @@ impl QuantizerStorage for RabitQuantizationStorage { packed_ex_codes, ex_add_factors, ex_scale_factors, - }) + }; + + match build_frag_reuse_mapping(fri.as_deref(), &storage.row_ids) { + Some(mapping) => storage.remap(&mapping), + None => Ok(storage), + } } fn metadata(&self) -> &Self::Metadata { diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index d591e42cc73..4443289f4be 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -4615,6 +4615,125 @@ mod tests { ); } + #[tokio::test] + async fn test_read_ivf_rq_index_v3_with_defer_index_remap() { + use arrow_array::cast::AsArray; + use lance_index::vector::bq::RQBuildParams; + + let mut dataset = lance_datagen::gen_batch() + .col( + "vec", + lance_datagen::array::rand_vec::(Dimension::from(128)), + ) + .into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000)) + .await + .unwrap(); + + let stored: Vec> = { + let mut scanner = dataset.scan(); + scanner.project(&["vec"]).unwrap(); + let batches = scanner + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let mut out = Vec::new(); + for batch in &batches { + let vecs = batch["vec"].as_fixed_size_list(); + for i in 0..batch.num_rows() { + let values = vecs.value(i); + let values = values.as_primitive::(); + out.push(values.values().to_vec()); + } + } + out + }; + + let index_name = Some("vec_idx".into()); + dataset + .create_index( + &["vec"], + IndexType::Vector, + index_name.clone(), + &VectorIndexParams { + metric_type: DistanceType::L2, + stages: vec![ + StageParams::Ivf(IvfBuildParams { + max_iters: 2, + num_partitions: Some(2), + sample_rate: 2, + ..Default::default() + }), + StageParams::RQ(RQBuildParams::new(1)), + ], + version: crate::index::vector::IndexFileVersion::V3, + skip_transpose: false, + runtime_hints: Default::default(), + }, + false, + ) + .await + .unwrap(); + let indices = dataset.load_indices().await.unwrap(); + let original_index = indices.iter().find(|idx| idx.name == "vec_idx").unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }; + let metrics = compact_files(&mut dataset, options, None).await.unwrap(); + assert!(metrics.fragments_removed > 0); + assert!(metrics.fragments_added > 0); + + let Some(current_index) = dataset.load_index_by_name("vec_idx").await.unwrap() else { + panic!("vec index must be available"); + }; + assert_eq!(current_index.uuid, original_index.uuid); + + let frag_reuse_present = dataset + .load_indices() + .await + .unwrap() + .iter() + .any(|idx| idx.name == FRAG_REUSE_INDEX_NAME); + assert!( + frag_reuse_present, + "defer_index_remap must record a {} index", + FRAG_REUSE_INDEX_NAME + ); + + let sample_step = (stored.len() / 8).max(1); + let mut checked = 0; + for query in stored.iter().step_by(sample_step) { + let query_vec = PrimitiveArray::::from_iter_values(query.iter().copied()); + let mut scanner = dataset.scan(); + scanner.nearest("vec", &query_vec, 5).unwrap(); + scanner.project(&["vec"]).unwrap().with_row_id(); + let batches = scanner + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(!batches.is_empty(), "query returned no batches"); + let top = &batches[0]; + assert!(top.num_rows() > 0, "query returned empty top batch"); + let top_vec = top["vec"].as_fixed_size_list().value(0); + let top_vec = top_vec.as_primitive::(); + assert_eq!( + top_vec.values(), + query.as_slice(), + "top-1 self-recall returned a different vector than the query" + ); + checked += 1; + } + assert!(checked > 0, "expected to check at least one stored vector"); + } + #[tokio::test] async fn test_default_compaction_planner() { let test_dir = TempStrDir::default(); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 8984d507408..96c57aeb796 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1891,9 +1891,15 @@ impl DatasetIndexInternalExt for Dataset { if let Some(entry) = self.index_cache.get_with_key(&state_key).await { log::debug!("Found IvfIndexState in cache uuid: {}", uuid); let partition_cache = self.index_cache.with_key_prefix(&state_key.key()); + let frag_reuse_index = self.open_frag_reuse_index(metrics).await?; return entry .0 - .reconstruct(object_store, self.metadata_cache.as_ref(), partition_cache) + .reconstruct( + object_store, + self.metadata_cache.as_ref(), + partition_cache, + frag_reuse_index, + ) .await; } diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 4ea076ed420..af4a7b87639 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -249,6 +249,7 @@ pub(crate) trait IvfStateEntry: DeepSizeOf + Send + Sync + 'static { object_store: Arc, file_metadata_cache: &'a LanceCache, index_cache: LanceCache, + frag_reuse_index: Option>, ) -> BoxFuture<'a, Result>>; } @@ -431,6 +432,7 @@ impl IvfStateEntry for IvfIndexState { object_store: Arc, file_metadata_cache: &'a LanceCache, index_cache: LanceCache, + frag_reuse_index: Option>, ) -> BoxFuture<'a, Result>> { Box::pin(async move { match self.sub_index_type { @@ -440,6 +442,7 @@ impl IvfStateEntry for IvfIndexState { object_store, file_metadata_cache, index_cache, + frag_reuse_index, ) .await } @@ -449,6 +452,7 @@ impl IvfStateEntry for IvfIndexState { object_store, file_metadata_cache, index_cache, + frag_reuse_index, ) .await } @@ -1812,6 +1816,7 @@ async fn reconstruct_typed( object_store: Arc, file_metadata_cache: &LanceCache, index_cache: LanceCache, + frag_reuse_index: Option>, ) -> Result> { let io_parallelism = object_store.io_parallelism(); @@ -1867,7 +1872,7 @@ async fn reconstruct_typed( state.aux_ivf.clone(), state.metadata.clone(), state.distance_type, - None, + frag_reuse_index, ); let rq_search_cache = IVFIndex::::rq_search_cache_from_state(state, &storage)?;