From 55174c0048a8e314682b765b289018616de1a43b Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 10 Jun 2026 18:34:41 -0700 Subject: [PATCH] feat(index): support distributed LabelList scalar index builds LabelListIndexPlugin::train_index rejected fragment-scoped training, so a LabelList index could not be built through the distributed / segmented path (per-fragment execute_uncommitted + merge_existing_index_segments) the way BTree, Inverted, Bitmap, and FM already are. Build over the already fragment-scoped data stream (mirroring FMIndexPlugin) and add merge_label_list_indices, which unions the per-segment bitmap states and list_nulls row sets. Route LabelList segments through merge_existing_index_segments. --- rust/lance-index/src/scalar/label_list.rs | 81 +++++++++++++++-- rust/lance/src/index.rs | 18 +++- rust/lance/src/index/create.rs | 106 ++++++++++++++++++++++ rust/lance/src/index/scalar.rs | 1 + rust/lance/src/index/scalar/label_list.rs | 81 +++++++++++++++++ 5 files changed, 279 insertions(+), 8 deletions(-) create mode 100644 rust/lance/src/index/scalar/label_list.rs diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index cf357d89585..1df72099675 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -491,6 +491,75 @@ async fn write_label_list_bitmap_index( .await } +/// Merge multiple LabelList index segments into a single index. +/// +/// A [`LabelListIndex`] is a [`BitmapIndex`] over the unnested list values plus a +/// separate `list_nulls` row set. Because distributed segments cover disjoint rows +/// (distinct fragments), merging is a cheap union of the underlying bitmap states +/// and of the `list_nulls` sets — no re-scan of source data is required. This +/// mirrors [`crate::scalar::bitmap::merge_bitmap_indices`] but also carries the +/// per-segment `list_nulls`. +pub async fn merge_label_list_indices( + source_indices: &[Arc], + dest_store: &dyn IndexStore, + progress: Arc, +) -> Result { + if source_indices.is_empty() { + return Err(Error::invalid_input( + "LabelList segment merge requires at least one source segment".to_string(), + )); + } + + let value_type = source_indices[0].values_index.value_type().clone(); + let mut merged_state = HashMap::::new(); + let mut merged_nulls = RowAddrTreeMap::new(); + + progress + .stage_start( + "merge_label_list_segments", + Some(source_indices.len() as u64), + "segments", + ) + .await?; + for (idx, source_index) in source_indices.iter().enumerate() { + if source_index.values_index.value_type() != &value_type { + return Err(Error::invalid_input(format!( + "LabelList segment has value type {:?}, expected {:?}", + source_index.values_index.value_type(), + value_type + ))); + } + + let state = source_index.values_index.load_bitmap_index_state().await?; + for (key, bitmap) in state { + merged_state + .entry(key) + .and_modify(|existing| *existing |= &bitmap) + .or_insert(bitmap); + } + merged_nulls |= source_index.list_nulls.as_ref(); + progress + .stage_progress("merge_label_list_segments", (idx + 1) as u64) + .await?; + } + progress.stage_complete("merge_label_list_segments").await?; + + progress + .stage_start("write_label_list_index", Some(1), "files") + .await?; + let file = + write_label_list_bitmap_index(merged_state, dest_store, &value_type, &merged_nulls).await?; + progress.stage_progress("write_label_list_index", 1).await?; + progress.stage_complete("write_label_list_index").await?; + + Ok(CreatedIndex { + index_details: prost_types::Any::from_msg(&pbold::LabelListIndexDetails::default()) + .unwrap(), + index_version: LABEL_LIST_INDEX_VERSION, + files: vec![file], + }) +} + /// The serializable state of a [`LabelListIndex`]. /// /// `LabelListIndex` is a thin wrapper around a [`BitmapIndex`] plus a separate @@ -636,15 +705,13 @@ impl ScalarIndexPlugin for LabelListIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, _request: Box, - fragment_ids: Option>, + // Training over a fragment subset is supported for distributed builds: the + // provided `data` stream is already scoped to those fragments, so a partial + // index covering exactly those rows is produced. Segments are recombined by + // `merge_label_list_indices`. + _fragment_ids: Option>, _progress: Arc, ) -> Result { - if fragment_ids.is_some() { - return Err(Error::invalid_input_source( - "LabelList index does not support fragment training".into(), - )); - } - let schema = data.schema(); let field = schema .column_with_name(VALUE_COLUMN_NAME) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 8984d507408..3515c9758c3 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -286,6 +286,13 @@ fn segment_has_fmindex_details(segment: &IndexMetadata) -> bool { .is_some_and(|details| details.type_url.ends_with("FMIndexIndexDetails")) } +fn segment_has_label_list_details(segment: &IndexMetadata) -> bool { + segment + .index_details + .as_ref() + .is_some_and(|details| details.type_url.ends_with("LabelListIndexDetails")) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub(crate) struct LegacyVectorIndexCacheKey<'a> { @@ -1143,7 +1150,14 @@ impl DatasetIndexExt for Dataset { let all_btree = source_segments.iter().all(segment_has_btree_details); let all_fmindex = source_segments.iter().all(segment_has_fmindex_details); let all_zonemap = source_segments.iter().all(segment_has_zonemap_details); - if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_fmindex && !all_zonemap + let all_label_list = source_segments.iter().all(segment_has_label_list_details); + if !all_vector + && !all_inverted + && !all_bitmap + && !all_btree + && !all_fmindex + && !all_zonemap + && !all_label_list { return Err(Error::invalid_input( "merge_existing_index_segments requires all segments to have the same supported index type" @@ -1164,6 +1178,8 @@ impl DatasetIndexExt for Dataset { crate::index::scalar::fmindex::merge_segments(self, source_segments).await? } else if all_bitmap { crate::index::scalar::bitmap::merge_segments(self, source_segments).await? + } else if all_label_list { + crate::index::scalar::label_list::merge_segments(self, source_segments).await? } else if all_zonemap { crate::index::scalar::zonemap::merge_segments(self, source_segments).await? } else { diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 2b6992e4849..bf0a3e535b4 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -2234,6 +2234,112 @@ mod tests { ); } + // Distributed LabelList build: one segment per fragment via + // `execute_uncommitted`, then `merge_existing_index_segments` consolidates them + // into a single canonical segment that answers `array_has_any` across all rows. + #[tokio::test] + async fn test_label_list_merge_existing_index_segments() { + use lance_index::scalar::{LabelListQuery, SearchResult}; + + // Open `segment` and count rows whose `labels` list contains `label`. + async fn count_has_any(dataset: &Dataset, segment: &IndexMetadata, label: i64) -> usize { + let field_path = dataset.schema().field_path(segment.fields[0]).unwrap(); + let index = crate::index::scalar::open_scalar_index( + dataset, + &field_path, + segment, + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let query = LabelListQuery::HasAnyLabel(vec![ScalarValue::Int64(Some(label))]); + match index.search(&query, &NoOpMetricsCollector).await.unwrap() { + SearchResult::Exact(row_addrs) => { + row_addrs.true_rows().row_addrs().unwrap().count() + } + other => panic!("expected exact result, got {other:?}"), + } + } + + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + // 4000 rows across two 2000-row fragments; each `labels` list cycles over 1..=5. + let mut dataset = gen_batch() + .col( + "labels", + lance_datagen::array::rand_list_any( + lance_datagen::array::cycle::(vec![1, 2, 3, 4, 5]), + false, + ), + ) + .into_dataset( + &dataset_uri, + FragmentCount::from(2), + FragmentRowCount::from(2000), + ) + .await + .unwrap(); + + // Ground truth via a full scan before any index exists. + let expected = dataset + .scan() + .project(&["labels"]) + .unwrap() + .filter("array_has_any(labels, [3])") + .unwrap() + .try_into_batch() + .await + .unwrap() + .num_rows(); + assert!( + expected > 0, + "test dataset must contain at least one row whose labels include 3" + ); + + // One LabelList segment per fragment, committed as a multi-segment index. + let params = + ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::LabelList); + let mut staged = Vec::new(); + for fragment in dataset.get_fragments() { + staged.push( + CreateIndexBuilder::new(&mut dataset, &["labels"], IndexType::LabelList, ¶ms) + .name("labels_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("labels_idx", "labels", staged) + .await + .unwrap(); + + // Merge the two per-fragment segments into a single segment covering both. + let merged = dataset + .merge_existing_index_segments( + dataset.load_indices_by_name("labels_idx").await.unwrap(), + ) + .await + .unwrap(); + assert_eq!( + merged.fragment_bitmap.as_ref().unwrap(), + &roaring::RoaringBitmap::from_iter([0u32, 1]) + ); + assert!( + merged + .index_details + .as_ref() + .unwrap() + .type_url + .ends_with("LabelListIndexDetails") + ); + // The merged segment returns every row whose labels include 3 across both + // fragments — i.e. the per-fragment bitmaps and null sets were unioned. + assert_eq!(count_has_any(&dataset, &merged, 3).await, expected); + } + #[tokio::test] async fn test_commit_existing_index_supports_local_hnsw_segments() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 9fb756ea154..75b03f6d318 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -8,6 +8,7 @@ pub(crate) mod bitmap; pub(crate) mod btree; pub(crate) mod fmindex; pub(crate) mod inverted; +pub(crate) mod label_list; pub(crate) mod zonemap; pub use inverted::{load_segment_details, load_segments}; diff --git a/rust/lance/src/index/scalar/label_list.rs b/rust/lance/src/index/scalar/label_list.rs new file mode 100644 index 00000000000..e68bfb7b15d --- /dev/null +++ b/rust/lance/src/index/scalar/label_list.rs @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use lance_index::metrics::NoOpMetricsCollector; +use lance_index::scalar::label_list::LabelListIndex; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; +use std::sync::Arc; +use uuid::Uuid; + +use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt}; + +/// Merge one caller-defined group of source LabelList segments into a single segment. +/// +/// A LabelList index is a bitmap over the unnested list values plus a `list_nulls` +/// row set, so segments are recombined by unioning the underlying bitmap states and +/// null sets (see [`lance_index::scalar::label_list::merge_label_list_indices`]) +/// rather than rebuilding from source text. +pub(in crate::index) async fn merge_segments( + dataset: &Dataset, + segments: Vec, +) -> Result { + if segments.is_empty() { + return Err(Error::index("No segment metadata was provided".to_string())); + } + + let field_id = *segments[0].fields.first().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing field ids", + segments[0].uuid + )) + })?; + let field_path = dataset.schema().field_path(field_id)?; + + let mut source_indices = Vec::with_capacity(segments.len()); + let mut fragment_bitmap = RoaringBitmap::new(); + for segment in &segments { + fragment_bitmap |= segment.fragment_bitmap.as_ref().cloned().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing fragment coverage", + segment.uuid + )) + })?; + let scalar_index = + super::open_scalar_index(dataset, &field_path, segment, &NoOpMetricsCollector).await?; + let label_list_index = scalar_index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index(format!( + "merge_existing_index_segments: expected label list segment {}, got {:?}", + segment.uuid, + scalar_index.index_type() + )) + })?; + source_indices.push(Arc::new(label_list_index.clone())); + } + + let new_uuid = Uuid::new_v4(); + let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_uuid)?; + let created_index = lance_index::scalar::label_list::merge_label_list_indices( + &source_indices, + &new_store, + lance_index::progress::noop_progress(), + ) + .await?; + + Ok(IndexMetadata { + uuid: new_uuid, + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(created_index.files), + ..segments[0].clone() + }) +}