From 4d67dae7352313d8ff20af488ae9bcb38f801554 Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Thu, 11 Jun 2026 17:12:11 +0800 Subject: [PATCH 1/3] fix: deduplicate BTree flat page row addresses --- rust/lance-index/src/scalar/btree/flat.rs | 97 ++++++++++++++++++- .../src/dataset/tests/dataset_merge_update.rs | 57 +++++++++++ 2 files changed, 149 insertions(+), 5 deletions(-) diff --git a/rust/lance-index/src/scalar/btree/flat.rs b/rust/lance-index/src/scalar/btree/flat.rs index 212ef6490be..be58d0e412a 100644 --- a/rust/lance-index/src/scalar/btree/flat.rs +++ b/rust/lance-index/src/scalar/btree/flat.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::cmp::Ordering; use std::collections::HashMap; use std::{ops::Bound, sync::Arc}; @@ -9,21 +10,21 @@ use arrow_array::{ ArrayRef, BooleanArray, RecordBatch, UInt64Array, cast::AsArray, types::UInt64Type, }; -use datafusion_common::DFSchema; +use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_physical_expr::create_physical_expr; use lance_arrow::RecordBatchExt; use lance_arrow::ipc::{read_ipc_stream_single_at, read_len_prefixed_bytes_at, write_ipc_stream}; -use lance_core::Result; use lance_core::cache::CacheCodecImpl; use lance_core::deepsize::DeepSizeOf; use lance_core::utils::address::RowAddress; +use lance_core::{Error, Result}; use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; use roaring::RoaringBitmap; use tracing::instrument; use crate::metrics::MetricsCollector; -use crate::scalar::btree::BTREE_VALUES_COLUMN; +use crate::scalar::btree::{BTREE_VALUES_COLUMN, OrderableScalarValue}; use crate::scalar::{AnyQuery, SargableQuery}; const VALUES_COL_IDX: usize = 0; @@ -51,8 +52,7 @@ impl DeepSizeOf for FlatIndex { impl FlatIndex { #[instrument(name = "FlatIndex::try_new", level = "debug", skip_all)] pub fn try_new(data: RecordBatch) -> Result { - // Sort by row id to make bitmap construction more efficient - let data = data.sort_by_column(IDS_COL_IDX, None)?; + let data = Self::normalize_batch(data)?; let has_nulls = data.column(VALUES_COL_IDX).null_count() > 0; let all_addrs_map = RowAddrTreeMap::from_sorted_iter( @@ -125,6 +125,59 @@ impl FlatIndex { )?) } + fn normalize_batch(data: RecordBatch) -> Result { + // Sort by row id to make bitmap construction more efficient. + let data = data.sort_by_column(IDS_COL_IDX, None)?; + let row_ids = data.column(IDS_COL_IDX).as_primitive::(); + + if data.num_rows() <= 1 { + return Ok(data); + } + + let mut keep_indices = Vec::with_capacity(data.num_rows()); + keep_indices.push(0); + + for idx in 1..data.num_rows() { + let row_id = row_ids.value(idx); + let last_kept_idx = *keep_indices.last().unwrap(); + let last_kept_row_id = row_ids.value(last_kept_idx); + + if row_id != last_kept_row_id { + keep_indices.push(idx); + continue; + } + + if !Self::values_equal(data.column(VALUES_COL_IDX), idx, last_kept_idx)? { + let value = ScalarValue::try_from_array(data.column(VALUES_COL_IDX), idx)?; + let last_kept_value = + ScalarValue::try_from_array(data.column(VALUES_COL_IDX), last_kept_idx)?; + return Err(Error::internal(format!( + "BTree flat index contains duplicate row id with conflicting values: \ + row_id={row_id}, existing_value={last_kept_value}, duplicate_value={value}" + ))); + } + } + + if keep_indices.len() == data.num_rows() { + return Ok(data); + } + + let keep_indices = + UInt64Array::from_iter_values(keep_indices.into_iter().map(|idx| idx as u64)); + let columns = data + .columns() + .iter() + .map(|column| arrow_select::take::take(column, &keep_indices, None)) + .collect::, arrow_schema::ArrowError>>()?; + Ok(RecordBatch::try_new(data.schema(), columns)?) + } + + fn values_equal(values: &ArrayRef, left_idx: usize, right_idx: usize) -> Result { + let left = OrderableScalarValue(ScalarValue::try_from_array(values, left_idx)?); + let right = OrderableScalarValue(ScalarValue::try_from_array(values, right_idx)?); + Ok(left.cmp(&right) == Ordering::Equal) + } + fn get_null_addrs(sorted_batch: &RecordBatch) -> Result { let null_mask = arrow::compute::is_null(sorted_batch.column(VALUES_COL_IDX))?; let null_ids = arrow_select::filter::filter(sorted_batch.column(IDS_COL_IDX), &null_mask)?; @@ -335,6 +388,40 @@ mod tests { assert_roundtrips(&FlatIndex::try_new(empty).unwrap()); } + #[test] + fn test_load_deduplicates_same_row_addr() { + // Loading should collapse redundant copies of the same value/row pair + // before building row-address maps. + let batch = record_batch!( + (BTREE_VALUES_COLUMN, Int32, [Some(10), Some(10), Some(11)]), + (BTREE_IDS_COLUMN, UInt64, [10, 10, 11]) + ) + .unwrap(); + + let index = FlatIndex::try_new(batch).unwrap(); + + assert_eq!(index.data.num_rows(), 2); + } + + #[test] + fn test_rejects_row_addr_with_different_values() { + // The same row address under different indexed values is not redundant; + // it would allow stale values to return the row. + let batch = record_batch!( + (BTREE_VALUES_COLUMN, Int32, [Some(10), Some(20)]), + (BTREE_IDS_COLUMN, UInt64, [10, 10]) + ) + .unwrap(); + + let err = FlatIndex::try_new(batch).unwrap_err(); + + assert!( + err.to_string() + .contains("BTree flat index contains duplicate row id with conflicting values"), + "{err}" + ); + } + #[tokio::test] async fn test_equality() { check_index(&SargableQuery::Equals(ScalarValue::from(100)), &[0]).await; diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index 7fa03d6e78d..0660e5afcba 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -1626,6 +1626,63 @@ async fn test_merge_insert_with_reordered_columns_and_index() { final_dataset.validate().await.unwrap(); } +#[tokio::test] +async fn test_btree_merge_deduplicates_row_addrs() { + // This public table flow creates an old BTree segment and a delta segment + // for the same row address. Merging them should not leave duplicate row + // addresses in the final flat page. + let batch = arrow_array::record_batch!(("id", Int32, [1]), ("payload", Int32, [10])).unwrap(); + let test_uri = TempStrDir::default(); + let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()); + let mut dataset = Dataset::write(reader, &test_uri, None).await.unwrap(); + + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + let source_batch = + arrow_array::record_batch!(("payload", Int32, [100]), ("id", Int32, [1])).unwrap(); + let merge_job = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .try_build() + .unwrap(); + let reader = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch.clone())], + source_batch.schema(), + )); + let (updated_dataset, _) = merge_job.execute(reader_to_stream(reader)).await.unwrap(); + let mut dataset = updated_dataset.as_ref().clone(); + + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + dataset + .optimize_indices(&OptimizeOptions::merge(2)) + .await + .unwrap(); + + let actual = dataset + .scan() + .filter("id = 1") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!(actual.num_rows(), 1); + assert_eq!(actual["id"].as_primitive::().value(0), 1); + assert_eq!(actual["payload"].as_primitive::().value(0), 100); +} + /// DataReplacement should invalidate index fragment bitmaps for replaced fields. #[tokio::test] async fn test_data_replacement_invalidates_index_bitmap() { From 01c5e33faaa7cd99fb65ec5c70dd532b2ef47e1a Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Fri, 12 Jun 2026 18:33:42 +0800 Subject: [PATCH 2/3] fix: keep BTree read path unchanged --- rust/lance-index/src/scalar/btree/flat.rs | 97 ++--------------------- 1 file changed, 5 insertions(+), 92 deletions(-) diff --git a/rust/lance-index/src/scalar/btree/flat.rs b/rust/lance-index/src/scalar/btree/flat.rs index be58d0e412a..212ef6490be 100644 --- a/rust/lance-index/src/scalar/btree/flat.rs +++ b/rust/lance-index/src/scalar/btree/flat.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::cmp::Ordering; use std::collections::HashMap; use std::{ops::Bound, sync::Arc}; @@ -10,21 +9,21 @@ use arrow_array::{ ArrayRef, BooleanArray, RecordBatch, UInt64Array, cast::AsArray, types::UInt64Type, }; -use datafusion_common::{DFSchema, ScalarValue}; +use datafusion_common::DFSchema; use datafusion_expr::execution_props::ExecutionProps; use datafusion_physical_expr::create_physical_expr; use lance_arrow::RecordBatchExt; use lance_arrow::ipc::{read_ipc_stream_single_at, read_len_prefixed_bytes_at, write_ipc_stream}; +use lance_core::Result; use lance_core::cache::CacheCodecImpl; use lance_core::deepsize::DeepSizeOf; use lance_core::utils::address::RowAddress; -use lance_core::{Error, Result}; use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps}; use roaring::RoaringBitmap; use tracing::instrument; use crate::metrics::MetricsCollector; -use crate::scalar::btree::{BTREE_VALUES_COLUMN, OrderableScalarValue}; +use crate::scalar::btree::BTREE_VALUES_COLUMN; use crate::scalar::{AnyQuery, SargableQuery}; const VALUES_COL_IDX: usize = 0; @@ -52,7 +51,8 @@ impl DeepSizeOf for FlatIndex { impl FlatIndex { #[instrument(name = "FlatIndex::try_new", level = "debug", skip_all)] pub fn try_new(data: RecordBatch) -> Result { - let data = Self::normalize_batch(data)?; + // Sort by row id to make bitmap construction more efficient + let data = data.sort_by_column(IDS_COL_IDX, None)?; let has_nulls = data.column(VALUES_COL_IDX).null_count() > 0; let all_addrs_map = RowAddrTreeMap::from_sorted_iter( @@ -125,59 +125,6 @@ impl FlatIndex { )?) } - fn normalize_batch(data: RecordBatch) -> Result { - // Sort by row id to make bitmap construction more efficient. - let data = data.sort_by_column(IDS_COL_IDX, None)?; - let row_ids = data.column(IDS_COL_IDX).as_primitive::(); - - if data.num_rows() <= 1 { - return Ok(data); - } - - let mut keep_indices = Vec::with_capacity(data.num_rows()); - keep_indices.push(0); - - for idx in 1..data.num_rows() { - let row_id = row_ids.value(idx); - let last_kept_idx = *keep_indices.last().unwrap(); - let last_kept_row_id = row_ids.value(last_kept_idx); - - if row_id != last_kept_row_id { - keep_indices.push(idx); - continue; - } - - if !Self::values_equal(data.column(VALUES_COL_IDX), idx, last_kept_idx)? { - let value = ScalarValue::try_from_array(data.column(VALUES_COL_IDX), idx)?; - let last_kept_value = - ScalarValue::try_from_array(data.column(VALUES_COL_IDX), last_kept_idx)?; - return Err(Error::internal(format!( - "BTree flat index contains duplicate row id with conflicting values: \ - row_id={row_id}, existing_value={last_kept_value}, duplicate_value={value}" - ))); - } - } - - if keep_indices.len() == data.num_rows() { - return Ok(data); - } - - let keep_indices = - UInt64Array::from_iter_values(keep_indices.into_iter().map(|idx| idx as u64)); - let columns = data - .columns() - .iter() - .map(|column| arrow_select::take::take(column, &keep_indices, None)) - .collect::, arrow_schema::ArrowError>>()?; - Ok(RecordBatch::try_new(data.schema(), columns)?) - } - - fn values_equal(values: &ArrayRef, left_idx: usize, right_idx: usize) -> Result { - let left = OrderableScalarValue(ScalarValue::try_from_array(values, left_idx)?); - let right = OrderableScalarValue(ScalarValue::try_from_array(values, right_idx)?); - Ok(left.cmp(&right) == Ordering::Equal) - } - fn get_null_addrs(sorted_batch: &RecordBatch) -> Result { let null_mask = arrow::compute::is_null(sorted_batch.column(VALUES_COL_IDX))?; let null_ids = arrow_select::filter::filter(sorted_batch.column(IDS_COL_IDX), &null_mask)?; @@ -388,40 +335,6 @@ mod tests { assert_roundtrips(&FlatIndex::try_new(empty).unwrap()); } - #[test] - fn test_load_deduplicates_same_row_addr() { - // Loading should collapse redundant copies of the same value/row pair - // before building row-address maps. - let batch = record_batch!( - (BTREE_VALUES_COLUMN, Int32, [Some(10), Some(10), Some(11)]), - (BTREE_IDS_COLUMN, UInt64, [10, 10, 11]) - ) - .unwrap(); - - let index = FlatIndex::try_new(batch).unwrap(); - - assert_eq!(index.data.num_rows(), 2); - } - - #[test] - fn test_rejects_row_addr_with_different_values() { - // The same row address under different indexed values is not redundant; - // it would allow stale values to return the row. - let batch = record_batch!( - (BTREE_VALUES_COLUMN, Int32, [Some(10), Some(20)]), - (BTREE_IDS_COLUMN, UInt64, [10, 10]) - ) - .unwrap(); - - let err = FlatIndex::try_new(batch).unwrap_err(); - - assert!( - err.to_string() - .contains("BTree flat index contains duplicate row id with conflicting values"), - "{err}" - ); - } - #[tokio::test] async fn test_equality() { check_index(&SargableQuery::Equals(ScalarValue::from(100)), &[0]).await; From 39578cac5897c0852984fb28320926601f70a689 Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Fri, 12 Jun 2026 20:11:14 +0800 Subject: [PATCH 3/3] fix: deduplicate BTree merge rows by value --- rust/lance-index/src/scalar/btree.rs | 51 ++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6d21e842e04..371be42126e 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -29,7 +29,8 @@ use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria}; use crate::{pbold, scalar::btree::flat::FlatIndex}; use arrow_arith::numeric::add; use arrow_array::{ - Array, ArrayAccessor, ArrowNativeTypeOp, PrimitiveArray, RecordBatch, UInt32Array, + Array, ArrayAccessor, ArrowNativeTypeOp, BooleanArray, PrimitiveArray, RecordBatch, + UInt32Array, cast::AsArray, new_empty_array, types::{ @@ -1856,7 +1857,8 @@ impl BTreeIndex { ..Default::default() }, )?; - let merged_stream = chunk_concat_stream(unchunked, first.batch_size as usize); + let deduplicated = deduplicate_value_ordered_btree_rows(unchunked); + let merged_stream = chunk_concat_stream(deduplicated, first.batch_size as usize); let files = train_btree_index(merged_stream, dest_store, first.batch_size, None, None).await?; @@ -1889,6 +1891,51 @@ fn filter_row_ids( Box::pin(RecordBatchStreamAdapter::new(schema, filtered)) } +fn deduplicate_value_ordered_btree_rows( + stream: SendableRecordBatchStream, +) -> SendableRecordBatchStream { + let schema = stream.schema(); + let deduplicated = stream::try_unfold( + (stream, None::, HashSet::::new()), + |(mut stream, mut current_value, mut row_ids_for_value)| async move { + loop { + let Some(batch) = stream.next().await.transpose()? else { + return Ok(None); + }; + + let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?; + let row_ids = batch + .column_by_name(ROW_ID) + .expect_ok()? + .as_primitive::(); + let mut mask = Vec::with_capacity(batch.num_rows()); + + for idx in 0..batch.num_rows() { + let value = ScalarValue::try_from_array(values, idx)?; + match current_value.as_ref() { + Some(current) + if OrderableScalarValue(current.clone()) + .cmp(&OrderableScalarValue(value.clone())) + == Ordering::Equal => {} + _ => { + current_value = Some(value); + row_ids_for_value.clear(); + } + } + mask.push(row_ids_for_value.insert(row_ids.value(idx))); + } + + let mask = BooleanArray::from(mask); + let batch = arrow_select::filter::filter_record_batch(&batch, &mask)?; + if batch.num_rows() > 0 { + return Ok(Some((batch, (stream, current_value, row_ids_for_value)))); + } + } + }, + ); + Box::pin(RecordBatchStreamAdapter::new(schema, deduplicated)) +} + fn wrap_bound(bound: &Bound) -> Bound { match bound { Bound::Unbounded => Bound::Unbounded,