Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,75 @@ async fn write_label_list_bitmap_index(
.await
}

/// Merge multiple LabelList index segments into a single index.
///
/// A [`LabelListIndex`] is a [`BitmapIndex`] over the unnested list values plus a
/// separate `list_nulls` row set. Because distributed segments cover disjoint rows
/// (distinct fragments), merging is a cheap union of the underlying bitmap states
/// and of the `list_nulls` sets — no re-scan of source data is required. This
/// mirrors [`crate::scalar::bitmap::merge_bitmap_indices`] but also carries the
/// per-segment `list_nulls`.
pub async fn merge_label_list_indices(
source_indices: &[Arc<LabelListIndex>],
dest_store: &dyn IndexStore,
progress: Arc<dyn crate::progress::IndexBuildProgress>,
) -> Result<CreatedIndex> {
if source_indices.is_empty() {
return Err(Error::invalid_input(
"LabelList segment merge requires at least one source segment".to_string(),
));
}

let value_type = source_indices[0].values_index.value_type().clone();
let mut merged_state = HashMap::<ScalarValue, RowAddrTreeMap>::new();
let mut merged_nulls = RowAddrTreeMap::new();

progress
.stage_start(
"merge_label_list_segments",
Some(source_indices.len() as u64),
"segments",
)
.await?;
for (idx, source_index) in source_indices.iter().enumerate() {
if source_index.values_index.value_type() != &value_type {
return Err(Error::invalid_input(format!(
"LabelList segment has value type {:?}, expected {:?}",
source_index.values_index.value_type(),
value_type
)));
}

let state = source_index.values_index.load_bitmap_index_state().await?;
for (key, bitmap) in state {
merged_state
.entry(key)
.and_modify(|existing| *existing |= &bitmap)
.or_insert(bitmap);
}
merged_nulls |= source_index.list_nulls.as_ref();
progress
.stage_progress("merge_label_list_segments", (idx + 1) as u64)
.await?;
}
progress.stage_complete("merge_label_list_segments").await?;

progress
.stage_start("write_label_list_index", Some(1), "files")
.await?;
let file =
write_label_list_bitmap_index(merged_state, dest_store, &value_type, &merged_nulls).await?;
progress.stage_progress("write_label_list_index", 1).await?;
progress.stage_complete("write_label_list_index").await?;

Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::LabelListIndexDetails::default())
.unwrap(),
index_version: LABEL_LIST_INDEX_VERSION,
files: vec![file],
})
}

/// The serializable state of a [`LabelListIndex`].
///
/// `LabelListIndex` is a thin wrapper around a [`BitmapIndex`] plus a separate
Expand Down Expand Up @@ -636,15 +705,13 @@ impl ScalarIndexPlugin for LabelListIndexPlugin {
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
_request: Box<dyn TrainingRequest>,
fragment_ids: Option<Vec<u32>>,
// Training over a fragment subset is supported for distributed builds: the
// provided `data` stream is already scoped to those fragments, so a partial
// index covering exactly those rows is produced. Segments are recombined by
// `merge_label_list_indices`.
_fragment_ids: Option<Vec<u32>>,
_progress: Arc<dyn crate::progress::IndexBuildProgress>,
) -> Result<CreatedIndex> {
if fragment_ids.is_some() {
return Err(Error::invalid_input_source(
"LabelList index does not support fragment training".into(),
));
}

let schema = data.schema();
let field = schema
.column_with_name(VALUE_COLUMN_NAME)
Expand Down
18 changes: 17 additions & 1 deletion rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ fn segment_has_fmindex_details(segment: &IndexMetadata) -> bool {
.is_some_and(|details| details.type_url.ends_with("FMIndexIndexDetails"))
}

fn segment_has_label_list_details(segment: &IndexMetadata) -> bool {
segment
.index_details
.as_ref()
.is_some_and(|details| details.type_url.ends_with("LabelListIndexDetails"))
}

// Cache keys for different index types
#[derive(Debug, Clone)]
pub(crate) struct LegacyVectorIndexCacheKey<'a> {
Expand Down Expand Up @@ -1143,7 +1150,14 @@ impl DatasetIndexExt for Dataset {
let all_btree = source_segments.iter().all(segment_has_btree_details);
let all_fmindex = source_segments.iter().all(segment_has_fmindex_details);
let all_zonemap = source_segments.iter().all(segment_has_zonemap_details);
if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_fmindex && !all_zonemap
let all_label_list = source_segments.iter().all(segment_has_label_list_details);
if !all_vector
&& !all_inverted
&& !all_bitmap
&& !all_btree
&& !all_fmindex
&& !all_zonemap
&& !all_label_list
{
return Err(Error::invalid_input(
"merge_existing_index_segments requires all segments to have the same supported index type"
Expand All @@ -1164,6 +1178,8 @@ impl DatasetIndexExt for Dataset {
crate::index::scalar::fmindex::merge_segments(self, source_segments).await?
} else if all_bitmap {
crate::index::scalar::bitmap::merge_segments(self, source_segments).await?
} else if all_label_list {
crate::index::scalar::label_list::merge_segments(self, source_segments).await?
} else if all_zonemap {
crate::index::scalar::zonemap::merge_segments(self, source_segments).await?
} else {
Expand Down
106 changes: 106 additions & 0 deletions rust/lance/src/index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,112 @@ mod tests {
);
}

// Distributed LabelList build: one segment per fragment via
// `execute_uncommitted`, then `merge_existing_index_segments` consolidates them
// into a single canonical segment that answers `array_has_any` across all rows.
#[tokio::test]
async fn test_label_list_merge_existing_index_segments() {
use lance_index::scalar::{LabelListQuery, SearchResult};

// Open `segment` and count rows whose `labels` list contains `label`.
async fn count_has_any(dataset: &Dataset, segment: &IndexMetadata, label: i64) -> usize {
let field_path = dataset.schema().field_path(segment.fields[0]).unwrap();
let index = crate::index::scalar::open_scalar_index(
dataset,
&field_path,
segment,
&NoOpMetricsCollector,
)
.await
.unwrap();
let query = LabelListQuery::HasAnyLabel(vec![ScalarValue::Int64(Some(label))]);
match index.search(&query, &NoOpMetricsCollector).await.unwrap() {
SearchResult::Exact(row_addrs) => {
row_addrs.true_rows().row_addrs().unwrap().count()
}
other => panic!("expected exact result, got {other:?}"),
}
}

let tmpdir = TempStrDir::default();
let dataset_uri = format!("file://{}", tmpdir.as_str());

// 4000 rows across two 2000-row fragments; each `labels` list cycles over 1..=5.
let mut dataset = gen_batch()
.col(
"labels",
lance_datagen::array::rand_list_any(
lance_datagen::array::cycle::<arrow::datatypes::Int64Type>(vec![1, 2, 3, 4, 5]),
false,
),
)
.into_dataset(
&dataset_uri,
FragmentCount::from(2),
FragmentRowCount::from(2000),
)
.await
.unwrap();

// Ground truth via a full scan before any index exists.
let expected = dataset
.scan()
.project(&["labels"])
.unwrap()
.filter("array_has_any(labels, [3])")
.unwrap()
.try_into_batch()
.await
.unwrap()
.num_rows();
assert!(
expected > 0,
"test dataset must contain at least one row whose labels include 3"
);

// One LabelList segment per fragment, committed as a multi-segment index.
let params =
ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::LabelList);
let mut staged = Vec::new();
for fragment in dataset.get_fragments() {
staged.push(
CreateIndexBuilder::new(&mut dataset, &["labels"], IndexType::LabelList, &params)
.name("labels_idx".to_string())
.fragments(vec![fragment.id() as u32])
.execute_uncommitted()
.await
.unwrap(),
);
}
dataset
.commit_existing_index_segments("labels_idx", "labels", staged)
.await
.unwrap();

// Merge the two per-fragment segments into a single segment covering both.
let merged = dataset
.merge_existing_index_segments(
dataset.load_indices_by_name("labels_idx").await.unwrap(),
)
.await
.unwrap();
assert_eq!(
merged.fragment_bitmap.as_ref().unwrap(),
&roaring::RoaringBitmap::from_iter([0u32, 1])
);
assert!(
merged
.index_details
.as_ref()
.unwrap()
.type_url
.ends_with("LabelListIndexDetails")
);
// The merged segment returns every row whose labels include 3 across both
// fragments — i.e. the per-fragment bitmaps and null sets were unioned.
assert_eq!(count_has_any(&dataset, &merged, 3).await, expected);
}

#[tokio::test]
async fn test_commit_existing_index_supports_local_hnsw_segments() {
let tmpdir = TempStrDir::default();
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) mod bitmap;
pub(crate) mod btree;
pub(crate) mod fmindex;
pub(crate) mod inverted;
pub(crate) mod label_list;
pub(crate) mod zonemap;

pub use inverted::{load_segment_details, load_segments};
Expand Down
81 changes: 81 additions & 0 deletions rust/lance/src/index/scalar/label_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use lance_index::metrics::NoOpMetricsCollector;
use lance_index::scalar::label_list::LabelListIndex;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use std::sync::Arc;
use uuid::Uuid;

use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt};

/// Merge one caller-defined group of source LabelList segments into a single segment.
///
/// A LabelList index is a bitmap over the unnested list values plus a `list_nulls`
/// row set, so segments are recombined by unioning the underlying bitmap states and
/// null sets (see [`lance_index::scalar::label_list::merge_label_list_indices`])
/// rather than rebuilding from source text.
pub(in crate::index) async fn merge_segments(
dataset: &Dataset,
segments: Vec<IndexMetadata>,
) -> Result<IndexMetadata> {
if segments.is_empty() {
return Err(Error::index("No segment metadata was provided".to_string()));
}

let field_id = *segments[0].fields.first().ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing field ids",
segments[0].uuid
))
})?;
let field_path = dataset.schema().field_path(field_id)?;

let mut source_indices = Vec::with_capacity(segments.len());
let mut fragment_bitmap = RoaringBitmap::new();
for segment in &segments {
fragment_bitmap |= segment.fragment_bitmap.as_ref().cloned().ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing fragment coverage",
segment.uuid
))
})?;
let scalar_index =
super::open_scalar_index(dataset, &field_path, segment, &NoOpMetricsCollector).await?;
let label_list_index = scalar_index
.as_any()
.downcast_ref::<LabelListIndex>()
.ok_or_else(|| {
Error::index(format!(
"merge_existing_index_segments: expected label list segment {}, got {:?}",
segment.uuid,
scalar_index.index_type()
))
})?;
source_indices.push(Arc::new(label_list_index.clone()));
}

let new_uuid = Uuid::new_v4();
let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_uuid)?;
let created_index = lance_index::scalar::label_list::merge_label_list_indices(
&source_indices,
&new_store,
lance_index::progress::noop_progress(),
)
.await?;

Ok(IndexMetadata {
uuid: new_uuid,
fields: vec![field_id],
dataset_version: dataset.manifest.version,
fragment_bitmap: Some(fragment_bitmap),
index_details: Some(Arc::new(created_index.index_details)),
index_version: created_index.index_version as i32,
created_at: Some(chrono::Utc::now()),
base_id: None,
files: Some(created_index.files),
..segments[0].clone()
})
}
Loading