From a83479b6efcd2be6928fb9e8ee8109813cc6df6c Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Wed, 10 Jun 2026 11:09:28 +0800 Subject: [PATCH 1/2] feat(index): consolidate bitmap segments and unindexed data on optimize --- rust/lance-index/src/scalar.rs | 9 + rust/lance-index/src/scalar/bitmap.rs | 444 ++------------------------ rust/lance-select/src/mask.rs | 5 + rust/lance/src/index/append.rs | 207 +++++++++++- rust/lance/src/index/create.rs | 217 +++++++------ rust/lance/src/index/scalar.rs | 47 +-- rust/lance/src/index/scalar/bitmap.rs | 82 +++-- rust/lance/src/index/scalar/btree.rs | 28 +- 8 files changed, 415 insertions(+), 624 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 772dfaf4089..2aef324de83 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -928,6 +928,15 @@ impl OldIndexDataFilter { .collect(), } } + + /// Filter a posting list of row addresses in place, retaining only the rows + /// selected by this filter. + pub fn retain_row_addrs(&self, addrs: &mut RowAddrTreeMap) { + match self { + Self::Fragments { to_keep, .. } => addrs.retain_fragments_in(to_keep), + Self::RowIds(valid_row_ids) => *addrs &= valid_row_ids, + } + } } impl UpdateCriteria { diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 10254e699c5..7b88e7c5d29 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -3,8 +3,7 @@ use std::{ any::Any, - cmp::Reverse, - collections::{BTreeMap, BinaryHeap, HashMap}, + collections::{BTreeMap, HashMap}, fmt::Debug, ops::Bound, sync::Arc, @@ -29,14 +28,12 @@ use lance_core::{ error::LanceOptionExt, utils::tokio::get_num_compute_intensive_cpus, }; -use lance_io::object_store::ObjectStore; use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; -use object_store::path::Path; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use tracing::{instrument, warn}; -use super::{AnyQuery, IndexStore, ScalarIndex}; +use super::{AnyQuery, IndexStore, OldIndexDataFilter, ScalarIndex}; use super::{ BuiltinIndexType, SargableQuery, ScalarIndexParams, SearchResult, btree::OrderableScalarValue, }; @@ -58,18 +55,10 @@ use crate::{scalar::IndexReader, scalar::expression::ScalarQueryParser}; pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance"; pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats"; -const BITMAP_PART_LOOKUP_PREFIX: &str = "part_"; -const BITMAP_PART_LOOKUP_SUFFIX: &str = "_bitmap_page_lookup.lance"; -const EXPLICIT_SHARD_ID_TAG: u64 = 0; -const IMPLICIT_FRAGMENT_ID_TAG: u64 = 1; const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom const MAX_ROWS_PER_CHUNK: usize = 2 * 1024; -// Smaller than MAX_ROWS_PER_CHUNK to bound the per-cursor in-memory batch -// footprint during a k-way merge (N cursors × chunk), while still amortising -// I/O over a reasonable number of rows per read. -const MERGE_ROWS_PER_CHUNK: usize = 512; const BITMAP_INDEX_VERSION: u32 = 0; @@ -883,64 +872,6 @@ impl BitmapBatchWriter { } } -fn bitmap_shard_file_name(partition_id: u64) -> String { - format!("{BITMAP_PART_LOOKUP_PREFIX}{partition_id}{BITMAP_PART_LOOKUP_SUFFIX}") -} - -fn tagged_bitmap_partition_id(id: u32, tag: u64) -> u64 { - ((id as u64) << 32) | tag -} - -fn bitmap_shard_partition_id(fragment_ids: &[u32], shard_id: Option) -> Result { - if fragment_ids.is_empty() { - return Err(Error::invalid_input( - "Bitmap shard build requires at least one fragment id".to_string(), - )); - } - - if let Some(shard_id) = shard_id { - return Ok(tagged_bitmap_partition_id(shard_id, EXPLICIT_SHARD_ID_TAG)); - } - - let [fragment_id] = fragment_ids else { - return Err(Error::invalid_input(format!( - "Bitmap distributed build over multiple fragments requires an explicit shard_id. \ - Received {} fragment ids: {:?}. Please assign mutually exclusive shard_id values \ - to disjoint fragment groups.", - fragment_ids.len(), - fragment_ids - ))); - }; - - Ok(tagged_bitmap_partition_id( - *fragment_id, - IMPLICIT_FRAGMENT_ID_TAG, - )) -} - -fn extract_bitmap_shard_id(filename: &str) -> Result { - let partition_id = filename - .strip_prefix(BITMAP_PART_LOOKUP_PREFIX) - .and_then(|name| name.strip_suffix(BITMAP_PART_LOOKUP_SUFFIX)) - .ok_or_else(|| { - Error::internal(format!("Invalid bitmap shard file name format: {filename}")) - })?; - partition_id.parse::().map_err(|_| { - Error::internal(format!( - "Failed to parse bitmap partition id from file name: {filename}" - )) - }) -} - -fn deserialize_bitmap(bitmap_bytes: &[u8], file_name: &str) -> Result { - RowAddrTreeMap::deserialize_from(bitmap_bytes).map_err(|error| { - Error::corrupt_file( - Path::from(file_name), - format!("Failed to deserialize bitmap bytes: {error}"), - ) - }) -} - async fn new_bitmap_batch_writer( index_store: &dyn IndexStore, file_name: &str, @@ -954,218 +885,6 @@ async fn new_bitmap_batch_writer( Ok(BitmapBatchWriter::new(index_file)) } -#[derive(Clone, Debug, Eq, PartialEq)] -struct BitmapHeapItem { - key: OrderableScalarValue, - shard_idx: usize, -} - -impl Ord for BitmapHeapItem { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.key - .cmp(&other.key) - .then_with(|| self.shard_idx.cmp(&other.shard_idx)) - } -} - -impl PartialOrd for BitmapHeapItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -struct BitmapShardCursor { - file_name: String, - reader: Arc, - total_rows: usize, - next_row_offset: usize, - batch: Option, - batch_row_idx: usize, -} - -impl BitmapShardCursor { - async fn try_new(file_name: String, reader: Arc) -> Result> { - let total_rows = reader.num_rows(); - if total_rows == 0 { - return Ok(None); - } - - let mut cursor = Self { - file_name, - reader, - total_rows, - next_row_offset: 0, - batch: None, - batch_row_idx: 0, - }; - if cursor.advance().await? { - Ok(Some(cursor)) - } else { - Ok(None) - } - } - - fn peek_key(&self) -> Result { - let batch = self.batch.as_ref().ok_or_else(|| { - Error::internal(format!( - "Bitmap shard {} has no active batch", - self.file_name - )) - })?; - let key = ScalarValue::try_from_array(batch.column(0), self.batch_row_idx)?; - Ok(OrderableScalarValue(key)) - } - - fn take_current(&mut self) -> Result<(ScalarValue, RowAddrTreeMap)> { - let batch = self.batch.as_ref().ok_or_else(|| { - Error::internal(format!( - "Bitmap shard {} has no active batch", - self.file_name - )) - })?; - let keys = batch.column(0); - let binary_bitmaps = batch - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::corrupt_file( - Path::from(self.file_name.as_str()), - "Bitmap shard batch has non-binary bitmap column".to_string(), - ) - })?; - let key = ScalarValue::try_from_array(keys, self.batch_row_idx)?; - let bitmap = deserialize_bitmap(binary_bitmaps.value(self.batch_row_idx), &self.file_name)?; - self.batch_row_idx += 1; - Ok((key, bitmap)) - } - - async fn advance(&mut self) -> Result { - loop { - if let Some(batch) = &self.batch - && self.batch_row_idx < batch.num_rows() - { - return Ok(true); - } - - if self.next_row_offset >= self.total_rows { - self.batch = None; - return Ok(false); - } - - let end_row = (self.next_row_offset + MERGE_ROWS_PER_CHUNK).min(self.total_rows); - let batch = self - .reader - .read_range(self.next_row_offset..end_row, None) - .await?; - self.next_row_offset = end_row; - self.batch = Some(batch); - self.batch_row_idx = 0; - } - } -} - -async fn advance_cursor_and_push( - cursors: &mut [BitmapShardCursor], - heap: &mut BinaryHeap>, - shard_idx: usize, -) -> Result<()> { - if cursors[shard_idx].advance().await? { - heap.push(Reverse(BitmapHeapItem { - key: cursors[shard_idx].peek_key()?, - shard_idx, - })); - } - Ok(()) -} - -async fn drain_same_key_bitmaps( - cursors: &mut [BitmapShardCursor], - heap: &mut BinaryHeap>, - item: BitmapHeapItem, -) -> Result<(ScalarValue, RowAddrTreeMap)> { - let (key, mut merged_bitmap) = cursors[item.shard_idx].take_current()?; - let merged_key = OrderableScalarValue(key); - advance_cursor_and_push(cursors, heap, item.shard_idx).await?; - - loop { - let Some(Reverse(next_item)) = heap.peek() else { - break; - }; - if next_item.key != merged_key { - break; - } - - let shard_idx = next_item.shard_idx; - let _ = heap.pop(); - let (_, bitmap) = cursors[shard_idx].take_current()?; - merged_bitmap |= &bitmap; - advance_cursor_and_push(cursors, heap, shard_idx).await?; - } - - Ok((merged_key.0, merged_bitmap)) -} - -async fn list_bitmap_shard_files( - object_store: &ObjectStore, - index_dir: &Path, - progress: &dyn IndexBuildProgress, -) -> Result> { - let mut shard_files = Vec::new(); - let mut list_stream = object_store.list(Some(index_dir.clone())); - while let Some(item) = list_stream.next().await { - match item { - Ok(meta) => { - let file_name = meta.location.filename().unwrap_or_default(); - if file_name.starts_with(BITMAP_PART_LOOKUP_PREFIX) - && file_name.ends_with(BITMAP_PART_LOOKUP_SUFFIX) - { - shard_files.push(file_name.to_string()); - progress - .stage_progress("scan_bitmap_shards", shard_files.len() as u64) - .await?; - } - } - Err(err) => { - return Err(Error::io(format!( - "Failed to list bitmap shard files in {}: {err}", - index_dir - ))); - } - } - } - let mut shard_files = shard_files - .into_iter() - .map(|file_name| extract_bitmap_shard_id(&file_name).map(|shard_id| (shard_id, file_name))) - .collect::>>()?; - shard_files.sort_unstable_by_key(|(shard_id, _)| *shard_id); - let shard_files = shard_files - .into_iter() - .map(|(_, file_name)| file_name) - .collect::>(); - if shard_files.is_empty() { - return Err(Error::invalid_input(format!( - "No bitmap shard files found in index directory: {}; \ - call build_index for each fragment before calling merge_index_metadata", - index_dir - ))); - } - Ok(shard_files) -} - -async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[String]) { - for file_name in shard_files { - if let Err(error) = store.delete_index_file(file_name).await { - warn!( - "Failed to delete bitmap shard file '{}': {}. \ - This does not affect the merged bitmap index, but the shard file \ - may need manual cleanup.", - file_name, error - ); - } - } -} - #[derive(Debug, Default)] pub struct BitmapIndexPlugin; @@ -1305,23 +1024,6 @@ impl BitmapIndexPlugin { Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await } - async fn train_bitmap_shard( - data: SendableRecordBatchStream, - index_store: &dyn IndexStore, - fragment_ids: &[u32], - shard_id: Option, - progress: Arc, - ) -> Result<()> { - let partition_id = bitmap_shard_partition_id(fragment_ids, shard_id)?; - let file_name = bitmap_shard_file_name(partition_id); - progress - .stage_start("build_bitmap_shard", None, "rows") - .await?; - Self::streaming_build_and_write(data, None, index_store, &file_name).await?; - progress.stage_complete("build_bitmap_shard").await?; - Ok(()) - } - /// Builds and writes a bitmap index in a streaming fashion from value-sorted /// input. Only one value's bitmap is in memory at a time, reducing peak memory /// from O(unique_values * avg_bitmap) to O(largest_single_bitmap). @@ -1499,104 +1201,21 @@ impl BitmapIndexPlugin { }) .collect() } - - /// Merge per-shard bitmap lookup files into a single bitmap index file. - /// - /// Each shard file is already sorted by key and can contain many distinct keys. - /// This method does not materialize an entire shard in memory. Instead, it keeps - /// one cursor per shard, where each cursor tracks the shard's current row within - /// a small in-memory batch. A min-heap stores the current key for each shard. - /// - /// The merge then proceeds as a streaming K-way merge: - /// - pop the smallest current key across all shards - /// - union the bitmap for that key with any other shards currently positioned on - /// the same key - /// - advance only those shards that participated in the union and push their next - /// keys back into the heap - /// - /// This keeps memory usage proportional to the number of shards plus the bitmaps - /// currently being merged, instead of the total number of keys across all shards. - async fn merge_shards( - store: &dyn IndexStore, - shard_files: &[String], - progress: Arc, - ) -> Result<()> { - progress - .stage_start("merge_bitmap_shards", None, "bitmaps") - .await?; - - let mut cursors = Vec::with_capacity(shard_files.len()); - let mut heap = BinaryHeap::with_capacity(shard_files.len()); - let mut value_type: Option = None; - - for file_name in shard_files { - let reader = store.open_index_file(file_name).await?; - let shard_value_type = reader.schema().fields[0].data_type().clone(); - if let Some(existing_type) = &value_type { - if existing_type != &shard_value_type { - return Err(Error::invalid_input(format!( - "Bitmap shard {} has value type {:?}, expected {:?}", - file_name, shard_value_type, existing_type - ))); - } - } else { - value_type = Some(shard_value_type); - } - if let Some(cursor) = BitmapShardCursor::try_new(file_name.clone(), reader).await? { - let key = cursor.peek_key()?; - let shard_idx = cursors.len(); - cursors.push(cursor); - heap.push(Reverse(BitmapHeapItem { key, shard_idx })); - } - } - - let value_type = value_type.ok_or_else(|| { - Error::invalid_input("Bitmap shard merge requires at least one shard file".to_string()) - })?; - let mut writer = new_bitmap_batch_writer(store, BITMAP_LOOKUP_NAME, &value_type).await?; - let mut merged_keys = 0u64; - - while let Some(Reverse(item)) = heap.pop() { - let (key, merged_bitmap) = - drain_same_key_bitmaps(&mut cursors, &mut heap, item).await?; - writer.emit(key, &merged_bitmap).await?; - merged_keys += 1; - progress - .stage_progress("merge_bitmap_shards", merged_keys) - .await?; - } - - progress.stage_complete("merge_bitmap_shards").await?; - progress - .stage_start("write_bitmap_index", Some(1), "files") - .await?; - writer.finish().await?; - progress.stage_progress("write_bitmap_index", 1).await?; - progress.stage_complete("write_bitmap_index").await?; - Ok(()) - } -} - -pub async fn merge_index_files( - object_store: &ObjectStore, - index_dir: &Path, - store: Arc, - progress: Arc, -) -> Result<()> { - progress - .stage_start("scan_bitmap_shards", None, "files") - .await?; - let shard_files = list_bitmap_shard_files(object_store, index_dir, progress.as_ref()).await?; - progress.stage_complete("scan_bitmap_shards").await?; - - BitmapIndexPlugin::merge_shards(store.as_ref(), &shard_files, progress).await?; - cleanup_bitmap_shard_files(store.as_ref(), &shard_files).await; - Ok(()) } +/// Consolidate the materialized state of several bitmap segments (and, +/// optionally, a stream of not-yet-indexed `new_data`) into a single canonical +/// bitmap written to `dest_store`. +/// +/// `old_data_filter` is applied only to the rows coming from `source_indices`, +/// dropping row addresses whose fragments compaction/deletion has retired; rows +/// from `new_data` are inserted unfiltered. The whole merged state is held in +/// memory, as bitmap segment consolidation has always done. pub async fn merge_bitmap_indices( source_indices: &[Arc], + new_data: Option, dest_store: &dyn IndexStore, + old_data_filter: Option, progress: Arc, ) -> Result { if source_indices.is_empty() { @@ -1636,6 +1255,18 @@ pub async fn merge_bitmap_indices( .await?; } progress.stage_complete("merge_bitmap_segments").await?; + if let Some(old_data_filter) = old_data_filter { + merged_state.retain(|_, postings| { + old_data_filter.retain_row_addrs(postings); + !postings.is_empty() + }); + } + + // Fold the not-yet-indexed rows into the same in-memory state. + if let Some(new_data) = new_data { + (merged_state, _) = + BitmapIndexPlugin::build_bitmap_index_state(new_data, merged_state).await?; + } progress .stage_start("write_bitmap_index", Some(1), "files") @@ -1700,8 +1331,8 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, request: Box, - fragment_ids: Option>, - progress: Arc, + _fragment_ids: Option>, + _progress: Arc, ) -> Result { let request = request .as_any() @@ -1712,23 +1343,14 @@ impl ScalarIndexPlugin for BitmapIndexPlugin { .to_string(), ) })?; - if let Some(fragment_ids) = fragment_ids.as_ref() { - Self::train_bitmap_shard( - data, - index_store, - fragment_ids, - request.parameters.shard_id, - progress, - ) - .await?; - } else if request.parameters.shard_id.is_some() { - return Err(Error::invalid_input( - "Bitmap shard_id requires fragment_ids and is only supported for distributed shard builds" - .to_string(), - )); - } else { - Self::train_bitmap_index(data, index_store).await?; + if request.parameters.shard_id.is_some() { + warn!( + "Bitmap `shard_id` is deprecated and now ignored; each build now produces one \ + canonical segment. Use the segmented-index APIs instead. The `shard_id` field \ + will be removed in a future release." + ); } + Self::train_bitmap_index(data, index_store).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()) .unwrap(), diff --git a/rust/lance-select/src/mask.rs b/rust/lance-select/src/mask.rs index a10ad9a6f50..c44b77fe770 100644 --- a/rust/lance-select/src/mask.rs +++ b/rust/lance-select/src/mask.rs @@ -572,6 +572,11 @@ impl RowAddrTreeMap { .retain(|frag_id, _| frag_id_set.contains(frag_id)); } + /// Retain only the rows whose fragment id is contained in `keep`. + pub fn retain_fragments_in(&mut self, keep: &RoaringBitmap) { + self.inner.retain(|frag_id, _| keep.contains(*frag_id)); + } + /// Compute the serialized size of the set. pub fn serialized_size(&self) -> usize { // Starts at 4 because of the u32 num_entries diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index a89b64df276..f6e5ce54219 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -94,6 +94,45 @@ pub async fn build_old_data_filter( } } +/// Split the stored fragment coverage of `segments` into fragments still live +/// in `dataset` (`effective`) and fragments that compaction or deletion has +/// already retired (`deleted`). +pub fn split_segment_coverage<'a>( + dataset: &Dataset, + segments: impl IntoIterator, +) -> (RoaringBitmap, RoaringBitmap) { + let mut effective = RoaringBitmap::new(); + let mut deleted = RoaringBitmap::new(); + for segment in segments { + if let Some(eff) = segment.effective_fragment_bitmap(&dataset.fragment_bitmap) { + effective |= eff; + } + if let Some(del) = segment.deleted_fragment_bitmap(&dataset.fragment_bitmap) { + deleted |= del; + } + } + (effective, deleted) +} + +/// Validate that every segment carries fragment coverage, split that coverage +/// into still-live and retired fragments, and build the matching [`OldIndexDataFilter`]. +pub async fn effective_coverage_and_filter( + dataset: &Dataset, + segments: &[IndexMetadata], +) -> Result<(RoaringBitmap, Option)> { + for segment in segments { + if segment.fragment_bitmap.is_none() { + return Err(Error::invalid_input(format!( + "CreateIndex: segment {} is missing fragment coverage", + segment.uuid + ))); + } + } + let (effective, deleted) = split_segment_coverage(dataset, segments); + let old_data_filter = build_old_data_filter(dataset, &effective, &deleted).await?; + Ok((effective, old_data_filter)) +} + async fn load_unindexed_training_data( dataset: &Dataset, field_path: &str, @@ -194,16 +233,8 @@ async fn merge_scalar_indices<'a>( .await?; // Effective = bitmap ∩ live fragments; deleted = bitmap \ live fragments. - let mut effective_old_frags = RoaringBitmap::new(); - let mut deleted_old_frags = RoaringBitmap::new(); - for idx in selected_old_indices { - if let Some(effective) = idx.effective_fragment_bitmap(&dataset.fragment_bitmap) { - effective_old_frags |= effective; - } - if let Some(deleted) = idx.deleted_fragment_bitmap(&dataset.fragment_bitmap) { - deleted_old_frags |= deleted; - } - } + let (effective_old_frags, deleted_old_frags) = + split_segment_coverage(dataset.as_ref(), selected_old_indices.iter().copied()); let mut frag_bitmap = base_unindexed_bitmap.clone(); frag_bitmap |= &effective_old_frags; @@ -211,7 +242,7 @@ async fn merge_scalar_indices<'a>( // Scalar Index that expos an N:1 segment-merge primitive reachable without // rescanning the dataset - let has_segment_merge_primitive = matches!(index_type, IndexType::BTree); + let has_segment_merge_primitive = matches!(index_type, IndexType::BTree | IndexType::Bitmap); // Merge new data into the existing segment(s) instead of rebuilding from // scratch, when both hold: @@ -256,6 +287,25 @@ async fn merge_scalar_indices<'a>( ) .await? } + IndexType::Bitmap => { + if selected_old_indices.len() == 1 { + // Memory optimization: a single segment can absorb the new data + // via `BitmapIndex::update` without loading all into memory at once. + reference_index + .update(new_data_stream, &new_store, None) + .await? + } else { + crate::index::scalar::bitmap::open_and_merge_segments( + dataset.as_ref(), + field_path, + selected_old_indices, + new_data_stream, + &new_store, + old_data_filter, + ) + .await? + } + } _ => { reference_index .update(new_data_stream, &new_store, old_data_filter) @@ -1710,6 +1760,141 @@ mod tests { assert_eq!(rows, 2, "value 'd' lives in appended fragment"); } + #[tokio::test] + async fn test_optimize_bitmap_multi_segment_consolidation() { + async fn query_count(dataset: &Dataset, value: &str) -> usize { + dataset + .scan() + .filter(&format!("category = '{}'", value)) + .unwrap() + .project(&["category"]) + .unwrap() + .try_into_batch() + .await + .unwrap() + .num_rows() + } + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "category", + DataType::Utf8, + false, + )])); + let make_batch = |labels: &[&str]| { + let arr = StringArray::from_iter_values(labels.iter().copied()); + RecordBatch::try_new(schema.clone(), vec![Arc::new(arr)]).unwrap() + }; + + // Three fragments, each committed as its own Bitmap segment so optimize + // sees a multi-segment logical index. + // frag0={a,b}, frag1={a,c}, frag2={b,c}. + let reader = RecordBatchIterator::new( + vec![ + Ok(make_batch(&["a", "b"])), + Ok(make_batch(&["a", "c"])), + Ok(make_batch(&["b", "c"])), + ], + schema.clone(), + ); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::Bitmap); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 3); + let frag0_id = fragments[0].id() as u32; + let mut staged_segments = Vec::new(); + for fragment in &fragments { + staged_segments.push( + crate::index::create::CreateIndexBuilder::new( + &mut dataset, + &["category"], + IndexType::Bitmap, + ¶ms, + ) + .name("cat_idx".into()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("cat_idx", "category", staged_segments) + .await + .unwrap(); + assert_eq!( + dataset.load_indices_by_name("cat_idx").await.unwrap().len(), + 3 + ); + + dataset.delete("category IN ('a', 'b')").await.unwrap(); + let live_frag_ids: Vec = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + assert!( + !live_frag_ids.contains(&frag0_id), + "frag0 should be retired after deleting all its rows" + ); + assert_eq!(live_frag_ids.len(), 2); + + // Append a fourth fragment, leave it unindexed. + let appended = RecordBatchIterator::new(vec![Ok(make_batch(&["a", "d"]))], schema.clone()); + let mut dataset = Dataset::write( + appended, + test_uri, + Some(WriteParams { + max_rows_per_file: 2, + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + // merge(3) selects all three old segments (one now backed only by the + // retired frag0) and consolidates them, together with the unindexed + // fragment, into a single segment. + dataset + .optimize_indices(&OptimizeOptions::merge(3)) + .await + .unwrap(); + + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + // Live rows after the delete + append: frag1={c}, frag2={c}, frag3={a,d}. + // The retired frag0's 'a'/'b' rows must not resurface. + assert_eq!(query_count(&dataset, "a").await, 1); + assert_eq!(query_count(&dataset, "b").await, 0); + assert_eq!(query_count(&dataset, "c").await, 2); + assert_eq!(query_count(&dataset, "d").await, 1); + + // The segments collapsed into a single one covering only the still-live + // fragments (frag1, frag2, frag3); the retired frag0 was filtered out of + // the consolidated coverage. + let segments_after = dataset.load_indices_by_name("cat_idx").await.unwrap(); + assert_eq!(segments_after.len(), 1); + let coverage = segments_after[0].fragment_bitmap.as_ref().unwrap(); + assert_eq!(coverage.len(), 3); + assert!( + !coverage.contains(frag0_id), + "retired frag0 must not appear in the consolidated coverage" + ); + } + #[tokio::test] async fn test_optimize_btree_keeps_rows_with_stable_row_ids_after_compaction() { async fn query_id_count(dataset: &Dataset, id: &str) -> usize { diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index ce8e65d8356..2dd1fa3d2e5 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -10,7 +10,7 @@ use crate::{ index::{ DatasetIndexExt, DatasetIndexInternalExt, IntoIndexSegment, build_index_metadata_from_segments, - scalar::{build_bitmap_index_segment, build_scalar_index}, + scalar::build_scalar_index, vector::{ LANCE_VECTOR_INDEX, VectorIndexParams, build_distributed_vector_index, build_empty_vector_index, build_vector_index, @@ -259,44 +259,17 @@ impl<'a> CreateIndexBuilder<'a> { .preprocessed_data .take() .map(|reader| lance_datafusion::utils::reader_to_stream(Box::new(reader))); - if self.index_type == IndexType::Bitmap && self.fragments.is_some() { - if !train { - return Err(Error::invalid_input( - "canonical bitmap segment build requires train=true".to_string(), - )); - } - if preprocesssed_data.is_some() { - return Err(Error::invalid_input( - "canonical bitmap segment build does not accept preprocessed data" - .to_string(), - )); - } - let fragments = self.fragments.clone().ok_or_else(|| { - Error::invalid_input( - "canonical bitmap segment build requires fragment ids".to_string(), - ) - })?; - build_bitmap_index_segment( - self.dataset, - column, - &index_id.to_string(), - fragments, - self.progress.clone(), - ) - .await? - } else { - build_scalar_index( - self.dataset, - column, - &index_id.to_string(), - ¶ms, - train, - self.fragments.clone(), - preprocesssed_data, - self.progress.clone(), - ) - .await? - } + build_scalar_index( + self.dataset, + column, + &index_id.to_string(), + ¶ms, + train, + self.fragments.clone(), + preprocesssed_data, + self.progress.clone(), + ) + .await? } (IndexType::Scalar, LANCE_SCALAR_INDEX) => { // Guess the index type @@ -569,6 +542,13 @@ fn is_btree_scalar_params(params: &dyn IndexParams) -> bool { .is_some_and(|p| p.index_type.eq_ignore_ascii_case("btree")) } +fn is_bitmap_scalar_params(params: &dyn IndexParams) -> bool { + params + .as_any() + .downcast_ref::() + .is_some_and(|p| p.index_type.eq_ignore_ascii_case("bitmap")) +} + /// Validate that a user-supplied `index_uuid` is permitted for this build. fn ensure_index_uuid_allowed( index_type: IndexType, @@ -576,17 +556,16 @@ fn ensure_index_uuid_allowed( fragments: Option<&Vec>, index_uuid: Option<&str>, ) -> Result<()> { - let is_btree = index_type == IndexType::BTree - || params - .as_any() - .downcast_ref::() - .map(|params| params.index_type.eq_ignore_ascii_case("btree")) - .unwrap_or(false); - - if index_uuid.is_some() && fragments.is_some_and(|fragments| !fragments.is_empty()) && is_btree + let is_segmented_scalar = matches!(index_type, IndexType::BTree | IndexType::Bitmap) + || is_btree_scalar_params(params) + || is_bitmap_scalar_params(params); + + if index_uuid.is_some() + && fragments.is_some_and(|fragments| !fragments.is_empty()) + && is_segmented_scalar { return Err(Error::invalid_input( - "index_uuid is no longer accepted for BTree distributed index builds; segment UUIDs \ + "index_uuid is no longer accepted for distributed scalar index builds; segment UUIDs \ are generated by Lance and returned in the index metadata." .to_string(), )); @@ -617,8 +596,9 @@ fn uses_segment_commit_path(index_type: IndexType, params: &dyn IndexParams) -> if params_family == LANCE_SCALAR_INDEX { match index_type { - IndexType::BTree => return true, + IndexType::BTree | IndexType::Bitmap => return true, IndexType::Scalar if is_btree_scalar_params(params) => return true, + IndexType::Scalar if is_bitmap_scalar_params(params) => return true, _ => {} } } @@ -1165,7 +1145,7 @@ mod tests { } #[tokio::test] - async fn test_merge_index_metadata_btree_soft_break() { + async fn test_merge_index_metadata_soft_break() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); let reader = gen_batch() @@ -1176,20 +1156,24 @@ mod tests { ); let dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); - let err = dataset - .merge_index_metadata( - &Uuid::new_v4().to_string(), - IndexType::BTree, - None, - Arc::new(NoopIndexBuildProgress), - ) - .await - .unwrap_err(); - assert!( - err.to_string() - .contains("no longer supports merge_index_metadata"), - "expected BTree merge_index_metadata soft-break error, got: {err}" - ); + // Both segmented scalar families have left the legacy distributed-merge + // entry point and must report the soft-break. + for index_type in [IndexType::BTree, IndexType::Bitmap] { + let err = dataset + .merge_index_metadata( + &Uuid::new_v4().to_string(), + index_type, + None, + Arc::new(NoopIndexBuildProgress), + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("no longer supports merge_index_metadata"), + "expected {index_type} merge_index_metadata soft-break error, got: {err}" + ); + } } /// Assert a committed segment directory holds exactly one canonical BTree @@ -1310,7 +1294,7 @@ mod tests { } #[tokio::test] - async fn test_btree_distributed_index_uuid_rejected() { + async fn test_distributed_index_uuid_rejected() { let test_dir = TempStrDir::default(); let dataset = gen_batch() .col("value", lance_datagen::array::step::()) @@ -1324,25 +1308,39 @@ mod tests { let mut dataset = dataset; let fragment_id = dataset.get_fragments()[0].id() as u32; - let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); - for index_type in [IndexType::BTree, IndexType::Scalar] { - let err = CreateIndexBuilder::new(&mut dataset, &["value"], index_type, ¶ms) - .name("value_btree_segments".to_string()) - .fragments(vec![fragment_id]) - .index_uuid(Uuid::new_v4().to_string()) - .execute_uncommitted() - .await - .unwrap_err(); - assert!( - matches!(err, Error::InvalidInput { .. }), - "expected invalid input error, got: {err}" - ); - assert!( - err.to_string().contains( - "index_uuid is no longer accepted for BTree distributed index builds" - ), - "unexpected error: {err}" - ); + // Each segmented scalar family rejects a user-supplied UUID for a + // fragment-scoped build, whether requested via its own IndexType or the + // generic Scalar wrapper. + for (builtin, native_type) in [ + ( + lance_index::scalar::BuiltinIndexType::BTree, + IndexType::BTree, + ), + ( + lance_index::scalar::BuiltinIndexType::Bitmap, + IndexType::Bitmap, + ), + ] { + let params = ScalarIndexParams::for_builtin(builtin); + for index_type in [native_type, IndexType::Scalar] { + let err = CreateIndexBuilder::new(&mut dataset, &["value"], index_type, ¶ms) + .name("value_segments".to_string()) + .fragments(vec![fragment_id]) + .index_uuid(Uuid::new_v4().to_string()) + .execute_uncommitted() + .await + .unwrap_err(); + assert!( + matches!(err, Error::InvalidInput { .. }), + "expected invalid input error for {index_type}, got: {err}" + ); + assert!( + err.to_string().contains( + "index_uuid is no longer accepted for distributed scalar index builds" + ), + "unexpected error for {index_type}: {err}" + ); + } } } @@ -1474,30 +1472,37 @@ mod tests { let fragments = dataset.get_fragments(); let fragment_ids: Vec = fragments.iter().map(|f| f.id() as u32).collect(); let selected_fragments = fragment_ids[..2].to_vec(); - let index = - CreateIndexBuilder::new(&mut dataset, &["category"], IndexType::Bitmap, &base_params) - .name("bitmap_segment".to_string()) - .fragments(selected_fragments.clone()) - .execute_uncommitted() - .await - .unwrap(); - assert_eq!( - index - .fragment_bitmap - .as_ref() - .unwrap() - .iter() - .collect::>(), - selected_fragments - ); + for index_type in [IndexType::Bitmap, IndexType::Scalar] { + let index = + CreateIndexBuilder::new(&mut dataset, &["category"], index_type, &base_params) + .name(format!("bitmap_segment_{index_type}")) + .fragments(selected_fragments.clone()) + .execute_uncommitted() + .await + .unwrap(); - let files = index.files.as_ref().unwrap(); - assert!(files.iter().any(|file| file.path == BITMAP_LOOKUP_NAME)); - assert!( - files.iter().all(|file| !file.path.starts_with("part_")), - "staged bitmap segment should only reference canonical files" - ); + assert_eq!( + index + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(), + selected_fragments, + "{index_type}: unexpected fragment coverage" + ); + + let files = index.files.as_ref().unwrap(); + assert!( + files.iter().any(|file| file.path == BITMAP_LOOKUP_NAME), + "{index_type}: staged segment is missing canonical {BITMAP_LOOKUP_NAME}" + ); + assert!( + files.iter().all(|file| !file.path.starts_with("part_")), + "{index_type}: staged bitmap segment should only reference canonical files" + ); + } } #[tokio::test] diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 92b06f0a1a5..c9618dbff27 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -42,7 +42,7 @@ use lance_index::scalar::label_list::{ use lance_index::scalar::registry::{ ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, VALUE_COLUMN_NAME, }; -use lance_index::scalar::{BuiltinIndexType, CreatedIndex, InvertedIndexParams}; +use lance_index::scalar::{CreatedIndex, InvertedIndexParams}; use lance_index::scalar::{ ScalarIndex, ScalarIndexParams, bitmap::BITMAP_LOOKUP_NAME, inverted::INVERT_LIST_FILE, lance_format::LanceIndexStore, @@ -323,51 +323,6 @@ pub(super) async fn build_scalar_index( Ok(created_index) } -/// Build a canonical bitmap index segment over a caller-selected fragment set. -/// -/// This is intentionally separate from `build_scalar_index(..., fragment_ids=Some(...))`. -/// The latter is the legacy distributed scalar-index shard path. Here fragment ids only -/// restrict the scanned rows; the bitmap plugin receives no shard id and writes the -/// canonical bitmap layout for the staged segment root. -#[instrument(level = "debug", skip_all)] -pub(super) async fn build_bitmap_index_segment( - dataset: &Dataset, - column: &str, - uuid: &str, - fragment_ids: Vec, - progress: Arc, -) -> Result { - let field = dataset - .schema() - .field(column) - .ok_or(Error::invalid_input_source( - format!("No column with name {}", column).into(), - ))?; - let field: arrow_schema::Field = field.into(); - - let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap); - let plugin = SCALAR_INDEX_PLUGIN_REGISTRY.get_plugin_by_name(¶ms.index_type)?; - let training_request = - plugin.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)?; - let criteria = training_request.criteria(); - - progress.stage_start("load_data", None, "rows").await?; - let training_data = - load_training_data(dataset, column, criteria, None, true, Some(fragment_ids)).await?; - progress.stage_complete("load_data").await?; - - let index_store = LanceIndexStore::from_dataset_for_new(dataset, uuid)?; - plugin - .train_index( - training_data, - &index_store, - training_request, - None, - progress, - ) - .await -} - /// Fetches the scalar index plugin for a given index metadata /// /// The fast path, on newer datasets, is just a plugin lookup by the type URL of the index details. diff --git a/rust/lance/src/index/scalar/bitmap.rs b/rust/lance/src/index/scalar/bitmap.rs index 11214a9bfdc..7411e41876e 100644 --- a/rust/lance/src/index/scalar/bitmap.rs +++ b/rust/lance/src/index/scalar/bitmap.rs @@ -1,16 +1,42 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use datafusion::physical_plan::SendableRecordBatchStream; use lance_index::metrics::NoOpMetricsCollector; use lance_index::scalar::bitmap::BitmapIndex; use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::{CreatedIndex, OldIndexDataFilter}; use lance_table::format::IndexMetadata; -use roaring::RoaringBitmap; use std::sync::Arc; use uuid::Uuid; use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt}; +/// Open the given bitmap `segments` and downcast them to [`BitmapIndex`]. +async fn open_bitmap_segments( + dataset: &Dataset, + field_path: &str, + segments: &[&IndexMetadata], +) -> Result>> { + let mut source_indices = Vec::with_capacity(segments.len()); + for &segment in segments { + let scalar_index = + super::open_scalar_index(dataset, field_path, segment, &NoOpMetricsCollector).await?; + let bitmap_index = scalar_index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index(format!( + "Bitmap merge: expected bitmap segment {}, got {:?}", + segment.uuid, + scalar_index.index_type() + )) + })?; + source_indices.push(Arc::new(bitmap_index.clone())); + } + Ok(source_indices) +} + /// Merge one caller-defined group of source bitmap segments into a single segment. pub(in crate::index) async fn merge_segments( dataset: &Dataset, @@ -28,35 +54,22 @@ pub(in crate::index) async fn merge_segments( })?; let field_path = dataset.schema().field_path(field_id)?; - let mut source_indices = Vec::with_capacity(segments.len()); - let mut fragment_bitmap = RoaringBitmap::new(); - for segment in &segments { - fragment_bitmap |= segment.fragment_bitmap.as_ref().cloned().ok_or_else(|| { - Error::invalid_input(format!( - "CreateIndex: segment {} is missing fragment coverage", - segment.uuid - )) - })?; - let scalar_index = - super::open_scalar_index(dataset, &field_path, segment, &NoOpMetricsCollector).await?; - let bitmap_index = scalar_index - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::index(format!( - "merge_existing_index_segments: expected bitmap segment {}, got {:?}", - segment.uuid, - scalar_index.index_type() - )) - })?; - source_indices.push(Arc::new(bitmap_index.clone())); - } + // Intersect each segment's stored coverage with the dataset's current + // fragments so we don't claim coverage on row addresses that compaction or + // pruning has already retired. + let (fragment_bitmap, old_data_filter) = + crate::index::append::effective_coverage_and_filter(dataset, &segments).await?; + + let segment_refs: Vec<&IndexMetadata> = segments.iter().collect(); + let source_indices = open_bitmap_segments(dataset, &field_path, &segment_refs).await?; let new_uuid = Uuid::new_v4(); let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_uuid.to_string())?; let created_index = lance_index::scalar::bitmap::merge_bitmap_indices( &source_indices, + None, &new_store, + old_data_filter, lance_index::progress::noop_progress(), ) .await?; @@ -74,3 +87,24 @@ pub(in crate::index) async fn merge_segments( ..segments[0].clone() }) } + +/// Open the given bitmap `segments` and merge their materialized state, together +/// with `new_data`, into a single canonical bitmap written to `new_store`. +pub(in crate::index) async fn open_and_merge_segments( + dataset: &Dataset, + field_path: &str, + segments: &[&IndexMetadata], + new_data: SendableRecordBatchStream, + new_store: &LanceIndexStore, + old_data_filter: Option, +) -> Result { + let source_indices = open_bitmap_segments(dataset, field_path, segments).await?; + lance_index::scalar::bitmap::merge_bitmap_indices( + &source_indices, + Some(new_data), + new_store, + old_data_filter, + lance_index::progress::noop_progress(), + ) + .await +} diff --git a/rust/lance/src/index/scalar/btree.rs b/rust/lance/src/index/scalar/btree.rs index 34534f6811b..d945ba621bd 100644 --- a/rust/lance/src/index/scalar/btree.rs +++ b/rust/lance/src/index/scalar/btree.rs @@ -17,7 +17,6 @@ use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::scalar::registry::VALUE_COLUMN_NAME; use lance_index::scalar::{CreatedIndex, OldIndexDataFilter}; use lance_table::format::IndexMetadata; -use roaring::RoaringBitmap; use uuid::Uuid; use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt}; @@ -121,31 +120,8 @@ pub(crate) async fn merge_segments( // Intersect each segment's stored bitmap with the dataset's current // fragments so we don't claim coverage on IDs that compaction or pruning // has already retired. - let dataset_fragments = dataset.fragment_bitmap.as_ref(); - let mut effective_old_frags = RoaringBitmap::new(); - let mut deleted_old_frags = RoaringBitmap::new(); - for segment in &segments { - if segment.fragment_bitmap.is_none() { - return Err(Error::invalid_input(format!( - "CreateIndex: segment {} is missing fragment coverage", - segment.uuid - ))); - } - if let Some(effective) = segment.effective_fragment_bitmap(dataset_fragments) { - effective_old_frags |= effective; - } - if let Some(deleted) = segment.deleted_fragment_bitmap(dataset_fragments) { - deleted_old_frags |= deleted; - } - } - - let fragment_bitmap = effective_old_frags.clone(); - let old_data_filter = crate::index::append::build_old_data_filter( - dataset, - &effective_old_frags, - &deleted_old_frags, - ) - .await?; + let (fragment_bitmap, old_data_filter) = + crate::index::append::effective_coverage_and_filter(dataset, &segments).await?; let output_uuid = Uuid::new_v4(); let new_store = LanceIndexStore::from_dataset_for_new(dataset, &output_uuid.to_string())?; From f10201829eba37fb0c59e1a89326ea041c502dbd Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Fri, 12 Jun 2026 13:02:40 +0800 Subject: [PATCH 2/2] feat(index): consolidate bitmap segments and unindexed data on optimize --- rust/lance-index/src/scalar/bitmap.rs | 30 +-- rust/lance-index/src/scalar/btree.rs | 15 +- rust/lance/src/index/append.rs | 267 ++++++++++++++++++++++++-- rust/lance/src/index/scalar/bitmap.rs | 10 +- rust/lance/src/index/scalar/btree.rs | 10 +- 5 files changed, 294 insertions(+), 38 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index a765cd94dd2..4a212713e1f 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -1204,15 +1204,12 @@ impl BitmapIndexPlugin { /// optionally, a stream of not-yet-indexed `new_data`) into a single canonical /// bitmap written to `dest_store`. /// -/// `old_data_filter` is applied only to the rows coming from `source_indices`, -/// dropping row addresses whose fragments compaction/deletion has retired; rows -/// from `new_data` are inserted unfiltered. The whole merged state is held in -/// memory, as bitmap segment consolidation has always done. +/// `old_data_filters` carries one optional filter per source segment pub async fn merge_bitmap_indices( source_indices: &[Arc], new_data: Option, dest_store: &dyn IndexStore, - old_data_filter: Option, + old_data_filters: &[Option], progress: Arc, ) -> Result { if source_indices.is_empty() { @@ -1221,6 +1218,15 @@ pub async fn merge_bitmap_indices( )); } + if old_data_filters.len() != source_indices.len() { + return Err(Error::invalid_input(format!( + "Bitmap merge: expected one old-data filter per source segment \ + ({} segments) but got {}", + source_indices.len(), + old_data_filters.len() + ))); + } + let value_type = source_indices[0].value_type().clone(); let mut merged_state = HashMap::::new(); @@ -1240,7 +1246,13 @@ pub async fn merge_bitmap_indices( ))); } - let state = source_index.load_bitmap_index_state().await?; + let mut state = source_index.load_bitmap_index_state().await?; + if let Some(old_data_filter) = &old_data_filters[idx] { + state.retain(|_, postings| { + old_data_filter.retain_row_addrs(postings); + !postings.is_empty() + }); + } for (key, bitmap) in state { merged_state .entry(key) @@ -1252,12 +1264,6 @@ pub async fn merge_bitmap_indices( .await?; } progress.stage_complete("merge_bitmap_segments").await?; - if let Some(old_data_filter) = old_data_filter { - merged_state.retain(|_, postings| { - old_data_filter.retain_row_addrs(postings); - !postings.is_empty() - }); - } // Fold the not-yet-indexed rows into the same in-memory state. if let Some(new_data) = new_data { diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6128248308e..e8e5c42a248 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1798,7 +1798,7 @@ impl BTreeIndex { segments: &[Arc], new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, - old_data_filter: Option, + old_data_filters: &[Option], ) -> Result { let Some(first) = segments.first() else { return Err(Error::invalid_input( @@ -1806,6 +1806,15 @@ impl BTreeIndex { )); }; + if old_data_filters.len() != segments.len() { + return Err(Error::invalid_input(format!( + "BTree merge: expected one old-data filter per source segment \ + ({} segments) but got {}", + segments.len(), + old_data_filters.len() + ))); + } + for segment in segments.iter().skip(1) { if segment.data_type != first.data_type { return Err(Error::index(format!( @@ -1827,7 +1836,7 @@ impl BTreeIndex { } let mut inputs: Vec> = Vec::with_capacity(segments.len() + 1); - for segment in segments { + for (segment, old_data_filter) in segments.iter().zip(old_data_filters) { let stream = segment.data_stream().await?; let stream = match old_data_filter.clone() { Some(filter) => filter_row_ids(stream, filter), @@ -2235,7 +2244,7 @@ impl ScalarIndex for BTreeIndex { &[Arc::new(self.clone())], new_data, dest_store, - old_data_filter, + &[old_data_filter], ) .await } diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 212e9fe9609..388f3170251 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -114,12 +114,32 @@ pub fn split_segment_coverage<'a>( (effective, deleted) } -/// Validate that every segment carries fragment coverage, split that coverage -/// into still-live and retired fragments, and build the matching [`OldIndexDataFilter`]. -pub async fn effective_coverage_and_filter( +/// Build one [`OldIndexDataFilter`] per segment, each derived from that +/// segment's *own* effective (still-live) and retired fragment coverage. +pub async fn build_per_segment_filters( + dataset: &Dataset, + segments: &[&IndexMetadata], +) -> Result>> { + let mut filters = Vec::with_capacity(segments.len()); + for segment in segments { + let effective = segment + .effective_fragment_bitmap(&dataset.fragment_bitmap) + .unwrap_or_default(); + let deleted = segment + .deleted_fragment_bitmap(&dataset.fragment_bitmap) + .unwrap_or_default(); + filters.push(build_old_data_filter(dataset, &effective, &deleted).await?); + } + Ok(filters) +} + +/// Validate that every segment carries fragment coverage, then return the +/// combined still-live coverage (for the merged segment's fragment bitmap) +/// together with one [`OldIndexDataFilter`] per segment. +pub async fn effective_coverage_and_filters( dataset: &Dataset, segments: &[IndexMetadata], -) -> Result<(RoaringBitmap, Option)> { +) -> Result<(RoaringBitmap, Vec>)> { for segment in segments { if segment.fragment_bitmap.is_none() { return Err(Error::invalid_input(format!( @@ -128,9 +148,10 @@ pub async fn effective_coverage_and_filter( ))); } } - let (effective, deleted) = split_segment_coverage(dataset, segments); - let old_data_filter = build_old_data_filter(dataset, &effective, &deleted).await?; - Ok((effective, old_data_filter)) + let (effective, _deleted) = split_segment_coverage(dataset, segments); + let segment_refs: Vec<&IndexMetadata> = segments.iter().collect(); + let filters = build_per_segment_filters(dataset, &segment_refs).await?; + Ok((effective, filters)) } async fn load_unindexed_training_data( @@ -271,9 +292,8 @@ async fn merge_scalar_indices<'a>( load_unindexed_training_data(dataset.as_ref(), field_path, &update_criteria, unindexed) .await?; let new_store = LanceIndexStore::from_dataset_for_new(&dataset, &new_uuid)?; - let old_data_filter = - build_old_data_filter(dataset.as_ref(), &effective_old_frags, &deleted_old_frags) - .await?; + let old_data_filters = + build_per_segment_filters(dataset.as_ref(), selected_old_indices).await?; match index_type { IndexType::BTree => { @@ -283,7 +303,7 @@ async fn merge_scalar_indices<'a>( selected_old_indices, new_data_stream, &new_store, - old_data_filter, + &old_data_filters, ) .await? } @@ -301,12 +321,22 @@ async fn merge_scalar_indices<'a>( selected_old_indices, new_data_stream, &new_store, - old_data_filter, + &old_data_filters, ) .await? } } _ => { + // Non-segmented scalar types only reach this branch with a single + // selected segment, so the union filter equals that segment's + // filter. Built lazily here so the segmented BTree/Bitmap paths + // above don't pay an extra row-id-sequence load they never use. + let old_data_filter = build_old_data_filter( + dataset.as_ref(), + &effective_old_frags, + &deleted_old_frags, + ) + .await?; reference_index .update(new_data_stream, &new_store, old_data_filter) .await? @@ -790,7 +820,7 @@ mod tests { use arrow::datatypes::{Float32Type, UInt32Type}; use arrow_array::cast::AsArray; use arrow_array::{ - FixedSizeListArray, RecordBatch, RecordBatchIterator, StringArray, UInt32Array, + FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt32Array, }; use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; @@ -1984,6 +2014,217 @@ mod tests { ); } + #[tokio::test] + async fn test_optimize_btree_no_duplicate_row_addr() { + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("payload", DataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(Int32Array::from(vec![10])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree); + dataset + .create_index( + &["id"], + IndexType::BTree, + Some("id_idx".into()), + ¶ms, + true, + ) + .await + .unwrap(); + + // Reordered source columns (payload, id) force the partial-schema + // RewriteColumns path instead of a row rewrite. + let source_schema = Arc::new(Schema::new(vec![ + Field::new("payload", DataType::Int32, false), + Field::new("id", DataType::Int32, false), + ])); + let source_batch = RecordBatch::try_new( + source_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![100])), + Arc::new(Int32Array::from(vec![1])), + ], + ) + .unwrap(); + let merge_job = + MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .try_build() + .unwrap(); + let source_reader = Box::new(RecordBatchIterator::new( + [Ok(source_batch)], + source_schema.clone(), + )); + merge_job + .execute(reader_to_stream(source_reader)) + .await + .unwrap(); + + // Build a delta BTree segment over the now-unindexed fragment. + let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + assert_eq!( + dataset.load_indices_by_name("id_idx").await.unwrap().len(), + 2, + "append must create a delta segment over the rewritten fragment" + ); + + // Force the old segment + delta segment to merge. + dataset + .optimize_indices(&OptimizeOptions::merge(2)) + .await + .unwrap(); + + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + let rows = dataset + .scan() + .filter("id = 1") + .unwrap() + .project(&["id"]) + .unwrap() + .try_into_batch() + .await + .unwrap() + .num_rows(); + assert_eq!(rows, 1, "id = 1 must return exactly one row after merge"); + } + + #[tokio::test] + async fn test_optimize_bitmap_no_stale_postings() { + async fn query_count(dataset: &Dataset, value: &str) -> usize { + dataset + .scan() + .filter(&format!("cat = '{}'", value)) + .unwrap() + .project(&["cat"]) + .unwrap() + .try_into_batch() + .await + .unwrap() + .num_rows() + } + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, false), + Field::new("cat", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["a"])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + // A scalar index on the join key forces merge_insert down the in-place + // RewriteColumns path, keeping the fragment live. + dataset + .create_index( + &["key"], + IndexType::BTree, + Some("key_idx".into()), + &ScalarIndexParams::for_builtin(BuiltinIndexType::BTree), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["cat"], + IndexType::Bitmap, + Some("cat_idx".into()), + &ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap), + true, + ) + .await + .unwrap(); + + // Reordered source columns (cat, key) force the in-place RewriteColumns + // path; the indexed `cat` value changes 'a' -> 'b' on the same row, + // pruning the cat index's coverage of the still-live fragment. + let source_schema = Arc::new(Schema::new(vec![ + Field::new("cat", DataType::Utf8, false), + Field::new("key", DataType::Int32, false), + ])); + let source_batch = RecordBatch::try_new( + source_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["b"])), + Arc::new(Int32Array::from(vec![1])), + ], + ) + .unwrap(); + let merge_job = + MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["key".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .try_build() + .unwrap(); + let source_reader = Box::new(RecordBatchIterator::new( + [Ok(source_batch)], + source_schema.clone(), + )); + merge_job + .execute(reader_to_stream(source_reader)) + .await + .unwrap(); + + let cat_only = || OptimizeOptions::append().index_names(vec!["cat_idx".to_string()]); + + let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + dataset.optimize_indices(&cat_only()).await.unwrap(); + assert_eq!( + dataset.load_indices_by_name("cat_idx").await.unwrap().len(), + 2, + "append must create a delta segment over the rewritten fragment" + ); + dataset + .optimize_indices(&OptimizeOptions::merge(2).index_names(vec!["cat_idx".to_string()])) + .await + .unwrap(); + + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + assert_eq!( + query_count(&dataset, "a").await, + 0, + "stale 'a' posting must be filtered out of the consolidated segment" + ); + assert_eq!( + query_count(&dataset, "b").await, + 1, + "the updated 'b' row must remain queryable" + ); + assert_eq!( + dataset.load_indices_by_name("cat_idx").await.unwrap().len(), + 1, + "the segments must collapse into a single consolidated segment" + ); + } + #[tokio::test] async fn test_optimize_btree_keeps_rows_with_stable_row_ids_after_compaction() { async fn query_id_count(dataset: &Dataset, id: &str) -> usize { diff --git a/rust/lance/src/index/scalar/bitmap.rs b/rust/lance/src/index/scalar/bitmap.rs index 06218118467..d5bbdcf2961 100644 --- a/rust/lance/src/index/scalar/bitmap.rs +++ b/rust/lance/src/index/scalar/bitmap.rs @@ -57,8 +57,8 @@ pub(in crate::index) async fn merge_segments( // Intersect each segment's stored coverage with the dataset's current // fragments so we don't claim coverage on row addresses that compaction or // pruning has already retired. - let (fragment_bitmap, old_data_filter) = - crate::index::append::effective_coverage_and_filter(dataset, &segments).await?; + let (fragment_bitmap, old_data_filters) = + crate::index::append::effective_coverage_and_filters(dataset, &segments).await?; let segment_refs: Vec<&IndexMetadata> = segments.iter().collect(); let source_indices = open_bitmap_segments(dataset, &field_path, &segment_refs).await?; @@ -69,7 +69,7 @@ pub(in crate::index) async fn merge_segments( &source_indices, None, &new_store, - old_data_filter, + &old_data_filters, lance_index::progress::noop_progress(), ) .await?; @@ -96,14 +96,14 @@ pub(in crate::index) async fn open_and_merge_segments( segments: &[&IndexMetadata], new_data: SendableRecordBatchStream, new_store: &LanceIndexStore, - old_data_filter: Option, + old_data_filters: &[Option], ) -> Result { let source_indices = open_bitmap_segments(dataset, field_path, segments).await?; lance_index::scalar::bitmap::merge_bitmap_indices( &source_indices, Some(new_data), new_store, - old_data_filter, + old_data_filters, lance_index::progress::noop_progress(), ) .await diff --git a/rust/lance/src/index/scalar/btree.rs b/rust/lance/src/index/scalar/btree.rs index 081957ecdad..268048da4dd 100644 --- a/rust/lance/src/index/scalar/btree.rs +++ b/rust/lance/src/index/scalar/btree.rs @@ -63,7 +63,7 @@ pub(crate) async fn open_and_merge_segments( segments: &[&IndexMetadata], new_data: SendableRecordBatchStream, new_store: &LanceIndexStore, - old_data_filter: Option, + old_data_filters: &[Option], ) -> Result { let mut source_indices = Vec::with_capacity(segments.len()); for &segment in segments { @@ -81,7 +81,7 @@ pub(crate) async fn open_and_merge_segments( })?; source_indices.push(Arc::new(btree.clone())); } - BTreeIndex::merge_segments(&source_indices, new_data, new_store, old_data_filter).await + BTreeIndex::merge_segments(&source_indices, new_data, new_store, old_data_filters).await } /// Merge one caller-defined group of source BTree segments into a single @@ -120,8 +120,8 @@ pub(crate) async fn merge_segments( // Intersect each segment's stored bitmap with the dataset's current // fragments so we don't claim coverage on IDs that compaction or pruning // has already retired. - let (fragment_bitmap, old_data_filter) = - crate::index::append::effective_coverage_and_filter(dataset, &segments).await?; + let (fragment_bitmap, old_data_filters) = + crate::index::append::effective_coverage_and_filters(dataset, &segments).await?; let output_uuid = Uuid::new_v4(); let new_store = LanceIndexStore::from_dataset_for_new(dataset, &output_uuid)?; @@ -135,7 +135,7 @@ pub(crate) async fn merge_segments( &segment_refs, empty_new_data, &new_store, - old_data_filter, + &old_data_filters, ) .await?;