Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions rust/lance-index/src/vector/bq/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,38 @@ pub fn unpack_codes(codes: &FixedSizeListArray) -> FixedSizeListArray {
FixedSizeListArray::try_new_from_values(UInt8Array::from(unpacked), code_len as i32).unwrap()
}

/// Build a row-id remapping for the rows present in this partition from a
/// fragment-reuse index, mirroring the PQ storage frag-reuse path.
///
/// Returns `None` when there is nothing to do (no fragment-reuse index, or the
/// index leaves every present row id unchanged), so callers keep the zero-cost
/// no-op path. Otherwise, returns a `HashMap` mapping every affected old row id
/// to `Some(new_id)` for surviving rows or `None` for rows whose covering
/// fragment was compacted away, suitable for `RabitQuantizationStorage::remap`.
fn build_frag_reuse_mapping(
fri: Option<&FragReuseIndex>,
row_ids: &UInt64Array,
) -> Option<HashMap<u64, Option<u64>>> {
let fri = fri?;
if fri.row_id_maps.is_empty() {
return None;
}
let mut mapping: HashMap<u64, Option<u64>> = HashMap::new();
for row_id in row_ids.values().iter() {
match fri.remap_row_id(*row_id) {
Some(new_id) if new_id == *row_id => {}
mapped => {
mapping.insert(*row_id, mapped);
}
}
}
if mapping.is_empty() {
None
} else {
Some(mapping)
}
}

#[async_trait]
impl QuantizerStorage for RabitQuantizationStorage {
type Metadata = RabitQuantizationMetadata;
Expand All @@ -2163,7 +2195,7 @@ impl QuantizerStorage for RabitQuantizationStorage {
batch: RecordBatch,
metadata: &Self::Metadata,
distance_type: DistanceType,
_fri: Option<Arc<FragReuseIndex>>,
fri: Option<Arc<FragReuseIndex>>,
) -> Result<Self> {
let distance_type = match (metadata.query_estimator, distance_type) {
(RabitQueryEstimator::RawQuery, DistanceType::Cosine) => DistanceType::L2,
Expand Down Expand Up @@ -2272,7 +2304,7 @@ impl QuantizerStorage for RabitQuantizationStorage {
metadata.packed = true;
let packed_ex_codes = maybe_pack_ex_codes(ex_codes.as_ref(), ex_bits);

Ok(Self {
let storage = Self {
metadata,
batch,
distance_type,
Expand All @@ -2285,7 +2317,12 @@ impl QuantizerStorage for RabitQuantizationStorage {
packed_ex_codes,
ex_add_factors,
ex_scale_factors,
})
};

match build_frag_reuse_mapping(fri.as_deref(), &storage.row_ids) {
Some(mapping) => storage.remap(&mapping),
None => Ok(storage),
}
}

fn metadata(&self) -> &Self::Metadata {
Expand Down
119 changes: 119 additions & 0 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4615,6 +4615,125 @@ mod tests {
);
}

#[tokio::test]
async fn test_read_ivf_rq_index_v3_with_defer_index_remap() {
use arrow_array::cast::AsArray;
use lance_index::vector::bq::RQBuildParams;

let mut dataset = lance_datagen::gen_batch()
.col(
"vec",
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
)
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();

let stored: Vec<Vec<f32>> = {
let mut scanner = dataset.scan();
scanner.project(&["vec"]).unwrap();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let mut out = Vec::new();
for batch in &batches {
let vecs = batch["vec"].as_fixed_size_list();
for i in 0..batch.num_rows() {
let values = vecs.value(i);
let values = values.as_primitive::<Float32Type>();
out.push(values.values().to_vec());
}
}
out
};

let index_name = Some("vec_idx".into());
dataset
.create_index(
&["vec"],
IndexType::Vector,
index_name.clone(),
&VectorIndexParams {
metric_type: DistanceType::L2,
stages: vec![
StageParams::Ivf(IvfBuildParams {
max_iters: 2,
num_partitions: Some(2),
sample_rate: 2,
..Default::default()
}),
StageParams::RQ(RQBuildParams::new(1)),
],
version: crate::index::vector::IndexFileVersion::V3,
skip_transpose: false,
runtime_hints: Default::default(),
},
false,
)
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let original_index = indices.iter().find(|idx| idx.name == "vec_idx").unwrap();

let options = CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
};
let metrics = compact_files(&mut dataset, options, None).await.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);

let Some(current_index) = dataset.load_index_by_name("vec_idx").await.unwrap() else {
panic!("vec index must be available");
};
assert_eq!(current_index.uuid, original_index.uuid);

let frag_reuse_present = dataset
.load_indices()
.await
.unwrap()
.iter()
.any(|idx| idx.name == FRAG_REUSE_INDEX_NAME);
assert!(
frag_reuse_present,
"defer_index_remap must record a {} index",
FRAG_REUSE_INDEX_NAME
);

let sample_step = (stored.len() / 8).max(1);
let mut checked = 0;
for query in stored.iter().step_by(sample_step) {
let query_vec = PrimitiveArray::<Float32Type>::from_iter_values(query.iter().copied());
let mut scanner = dataset.scan();
scanner.nearest("vec", &query_vec, 5).unwrap();
scanner.project(&["vec"]).unwrap().with_row_id();
let batches = scanner
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(!batches.is_empty(), "query returned no batches");
let top = &batches[0];
assert!(top.num_rows() > 0, "query returned empty top batch");
let top_vec = top["vec"].as_fixed_size_list().value(0);
let top_vec = top_vec.as_primitive::<Float32Type>();
assert_eq!(
top_vec.values(),
query.as_slice(),
"top-1 self-recall returned a different vector than the query"
);
checked += 1;
}
assert!(checked > 0, "expected to check at least one stored vector");
}

#[tokio::test]
async fn test_default_compaction_planner() {
let test_dir = TempStrDir::default();
Expand Down
8 changes: 7 additions & 1 deletion rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1891,9 +1891,15 @@ impl DatasetIndexInternalExt for Dataset {
if let Some(entry) = self.index_cache.get_with_key(&state_key).await {
log::debug!("Found IvfIndexState in cache uuid: {}", uuid);
let partition_cache = self.index_cache.with_key_prefix(&state_key.key());
let frag_reuse_index = self.open_frag_reuse_index(metrics).await?;
return entry
.0
.reconstruct(object_store, self.metadata_cache.as_ref(), partition_cache)
.reconstruct(
object_store,
self.metadata_cache.as_ref(),
partition_cache,
frag_reuse_index,
)
.await;
}

Expand Down
7 changes: 6 additions & 1 deletion rust/lance/src/index/vector/ivf/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub(crate) trait IvfStateEntry: DeepSizeOf + Send + Sync + 'static {
object_store: Arc<ObjectStore>,
file_metadata_cache: &'a LanceCache,
index_cache: LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
) -> BoxFuture<'a, Result<Arc<dyn VectorIndex>>>;
}

Expand Down Expand Up @@ -431,6 +432,7 @@ impl<Q: Quantization + 'static> IvfStateEntry for IvfIndexState<Q> {
object_store: Arc<ObjectStore>,
file_metadata_cache: &'a LanceCache,
index_cache: LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
) -> BoxFuture<'a, Result<Arc<dyn VectorIndex>>> {
Box::pin(async move {
match self.sub_index_type {
Expand All @@ -440,6 +442,7 @@ impl<Q: Quantization + 'static> IvfStateEntry for IvfIndexState<Q> {
object_store,
file_metadata_cache,
index_cache,
frag_reuse_index,
)
.await
}
Expand All @@ -449,6 +452,7 @@ impl<Q: Quantization + 'static> IvfStateEntry for IvfIndexState<Q> {
object_store,
file_metadata_cache,
index_cache,
frag_reuse_index,
)
.await
}
Expand Down Expand Up @@ -1812,6 +1816,7 @@ async fn reconstruct_typed<S: IvfSubIndex + 'static, Q: Quantization + 'static>(
object_store: Arc<ObjectStore>,
file_metadata_cache: &LanceCache,
index_cache: LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
) -> Result<Arc<dyn VectorIndex>> {
let io_parallelism = object_store.io_parallelism();

Expand Down Expand Up @@ -1867,7 +1872,7 @@ async fn reconstruct_typed<S: IvfSubIndex + 'static, Q: Quantization + 'static>(
state.aux_ivf.clone(),
state.metadata.clone(),
state.distance_type,
None,
frag_reuse_index,
);
let rq_search_cache = IVFIndex::<S, Q>::rq_search_cache_from_state(state, &storage)?;

Expand Down
Loading