From 055df348a9f3f7f3e284b02bfc81316e94830749 Mon Sep 17 00:00:00 2001 From: zhoubin11 Date: Tue, 9 Jun 2026 11:50:53 +0800 Subject: [PATCH] feat(java): support segment-based distributed vector search Expose index segment selection through the Java scan APIs so distributed vector search callers can target specific vector index segments. The Java ScanOptions builder now carries an optional List, and both blocking and async scanner paths forward it through JNI to Rust scanner construction. This threads the new option through LanceScanner, AsyncScanner, and the shared Rust ScannerOptions. JNI converts Java UUID values into Rust Uuid values and applies them with scanner.with_index_segments(...), leaving validation to the Rust scanner implementation. Test Coverage: No new tests were added. Suggested follow-up coverage includes Java/JNI blocking and async scanner tests for segment-restricted vector search and validation errors for invalid segment usage. Co-Authored-By: Claude Opus 4.8 Change-Id: I0cdc80404bcc406a9bf460ef061eb2bd0f6a70d1 --- java/lance-jni/src/async_scanner.rs | 4 + java/lance-jni/src/blocking_scanner.rs | 51 +++++--- .../main/java/org/lance/ipc/AsyncScanner.java | 3 + .../main/java/org/lance/ipc/LanceScanner.java | 3 + .../main/java/org/lance/ipc/ScanOptions.java | 109 ++++++++++++++++++ 5 files changed, 151 insertions(+), 19 deletions(-) diff --git a/java/lance-jni/src/async_scanner.rs b/java/lance-jni/src/async_scanner.rs index 6da10479266..8a4328dc214 100644 --- a/java/lance-jni/src/async_scanner.rs +++ b/java/lance-jni/src/async_scanner.rs @@ -177,6 +177,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local _class: JObject<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -203,6 +204,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local &mut env, jdataset, fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, @@ -231,6 +233,7 @@ fn inner_create_async_scanner<'local>( env: &mut JNIEnv<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -258,6 +261,7 @@ fn inner_create_async_scanner<'local>( let options = ScannerOptions { fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, diff --git a/java/lance-jni/src/blocking_scanner.rs b/java/lance-jni/src/blocking_scanner.rs index 700dea2f944..bb8e62a19b2 100644 --- a/java/lance-jni/src/blocking_scanner.rs +++ b/java/lance-jni/src/blocking_scanner.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use crate::error::{Error, Result}; use crate::ffi::JNIEnvExt; -use crate::traits::{import_vec_from_method, import_vec_to_rust}; +use crate::traits::{FromJObjectWithEnv, import_vec_from_method, import_vec_to_rust}; use arrow::array::Float32Array; use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream}; use arrow_schema::SchemaRef; @@ -25,6 +25,7 @@ use lance_index::scalar::inverted::query::{ }; use lance_io::ffi::to_ffi_arrow_array_stream; use lance_linalg::distance::DistanceType; +use uuid::Uuid; use crate::{ RT, @@ -231,6 +232,7 @@ pub(crate) fn build_full_text_search_query<'a>( /// Scanner options passed from JNI - shared between blocking and async scanners pub(crate) struct ScannerOptions<'a> { pub fragment_ids_obj: JObject<'a>, + pub index_segments_obj: JObject<'a>, pub columns_obj: JObject<'a>, pub substrait_filter_obj: JObject<'a>, pub filter_obj: JObject<'a>, @@ -275,6 +277,13 @@ pub(crate) fn build_scanner_with_options<'a>( scanner.with_fragments(fragments); } + env.get_optional(&options.index_segments_obj, |env, java_segments| { + let index_segments: Vec = + import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + scanner.with_index_segments(index_segments)?; + Ok(()) + })?; + let columns_opt = env.get_strings_opt(&options.columns_obj)?; if let Some(columns) = columns_opt { scanner.project(&columns)?; @@ -418,26 +427,27 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>( mut env: JNIEnv<'local>, _reader: JObject<'local>, jdataset: JObject<'local>, - fragment_ids_obj: JObject<'local>, // Optional> - columns_obj: JObject<'local>, // Optional> + fragment_ids_obj: JObject<'local>, // Optional> + index_segments_obj: JObject<'local>, // Optional> + columns_obj: JObject<'local>, // Optional> substrait_filter_obj: JObject<'local>, // Optional - filter_obj: JObject<'local>, // Optional - batch_size_obj: JObject<'local>, // Optional - limit_obj: JObject<'local>, // Optional - offset_obj: JObject<'local>, // Optional - query_obj: JObject<'local>, // Optional - fts_query_obj: JObject<'local>, // Optional - prefilter: jboolean, // boolean - with_row_id: jboolean, // boolean - with_row_address: jboolean, // boolean - batch_readahead: jint, // int - column_orderings: JObject<'local>, // Optional> - use_scalar_index: jboolean, // boolean - fast_search: jboolean, // boolean + filter_obj: JObject<'local>, // Optional + batch_size_obj: JObject<'local>, // Optional + limit_obj: JObject<'local>, // Optional + offset_obj: JObject<'local>, // Optional + query_obj: JObject<'local>, // Optional + fts_query_obj: JObject<'local>, // Optional + prefilter: jboolean, // boolean + with_row_id: jboolean, // boolean + with_row_address: jboolean, // boolean + batch_readahead: jint, // int + column_orderings: JObject<'local>, // Optional> + use_scalar_index: jboolean, // boolean + fast_search: jboolean, // boolean substrait_aggregate_obj: JObject<'local>, // Optional - collect_stats: jboolean, // boolean - include_deleted_rows: jboolean, // boolean - strict_batch_size: jboolean, // boolean + collect_stats: jboolean, // boolean + include_deleted_rows: jboolean, // boolean + strict_batch_size: jboolean, // boolean disable_scoring_autoprojection: jboolean, // boolean ) -> JObject<'local> { ok_or_throw!( @@ -446,6 +456,7 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>( &mut env, jdataset, fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, @@ -475,6 +486,7 @@ fn inner_create_scanner<'local>( env: &mut JNIEnv<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -503,6 +515,7 @@ fn inner_create_scanner<'local>( let options = ScannerOptions { fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, diff --git a/java/src/main/java/org/lance/ipc/AsyncScanner.java b/java/src/main/java/org/lance/ipc/AsyncScanner.java index 6e515e3546c..e241682719a 100644 --- a/java/src/main/java/org/lance/ipc/AsyncScanner.java +++ b/java/src/main/java/org/lance/ipc/AsyncScanner.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -65,6 +66,7 @@ public static AsyncScanner create( createAsyncScanner( dataset, options.getFragmentIds(), + options.getIndexSegments(), options.getColumns(), options.getSubstraitFilter(), options.getFilter(), @@ -91,6 +93,7 @@ public static AsyncScanner create( static native AsyncScanner createAsyncScanner( Dataset dataset, Optional> fragmentIds, + Optional> indexSegments, Optional> columns, Optional substraitFilter, Optional filter, diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 3a413e0ccfd..f541625e9b7 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; /** Scanner over a Fragment. */ public class LanceScanner implements org.apache.arrow.dataset.scanner.Scanner { @@ -61,6 +62,7 @@ public static LanceScanner create( createScanner( dataset, options.getFragmentIds(), + options.getIndexSegments(), options.getColumns(), options.getSubstraitFilter(), options.getFilter(), @@ -90,6 +92,7 @@ public static LanceScanner create( static native LanceScanner createScanner( Dataset dataset, Optional> fragmentIds, + Optional> indexSegments, Optional> columns, Optional substraitFilter, Optional filter, diff --git a/java/src/main/java/org/lance/ipc/ScanOptions.java b/java/src/main/java/org/lance/ipc/ScanOptions.java index a9aad590c2b..a1d18631f74 100644 --- a/java/src/main/java/org/lance/ipc/ScanOptions.java +++ b/java/src/main/java/org/lance/ipc/ScanOptions.java @@ -19,10 +19,12 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; /** Lance scan options. */ public class ScanOptions { private final Optional> fragmentIds; + private final Optional> indexSegments; private final Optional batchSize; private final Optional> columns; private final Optional filter; @@ -64,6 +66,7 @@ public ScanOptions( boolean collectStats) { this( fragmentIds, + Optional.empty(), batchSize, columns, filter, @@ -108,7 +111,10 @@ public ScanOptions( * @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown. * @param collectStats Whether to collect scan execution statistics. Default is false. * @param fastSearch Whether to only search indexed fragments. Default is false. + * @deprecated Use the overload that adds {@code indexSegments} for vector index segment + * selection. */ + @Deprecated public ScanOptions( Optional> fragmentIds, Optional batchSize, @@ -131,10 +137,89 @@ public ScanOptions( boolean includeDeletedRows, boolean strictBatchSize, boolean disableScoringAutoprojection) { + this( + fragmentIds, + Optional.empty(), + batchSize, + columns, + filter, + substraitFilter, + limit, + offset, + nearest, + fullTextQuery, + prefilter, + withRowId, + withRowAddress, + batchReadahead, + columnOrderings, + useScalarIndex, + substraitAggregate, + collectStats, + fastSearch, + includeDeletedRows, + strictBatchSize, + disableScoringAutoprojection); + } + + /** + * Constructor for LanceScanOptions. + * + * @param fragmentIds the id of the fragments to scan + * @param indexSegments (Optional) Vector index segment UUIDs to restrict the search to. Only + * valid for nearest-neighbor search. Empty list is rejected by the engine. When combined with + * {@code fragmentIds}, fragments outside the selected segments still fall back to flat KNN. + * @param batchSize Maximum row number of each returned ArrowRecordBatch. Optional, use + * Optional.empty() if unspecified. + * @param columns (Optional) Projected columns. Optional.empty() for scanning all columns. + * Otherwise, only columns present in the List will be scanned. + * @param filter (Optional) Filter expression. Optional.empty() for no filter. + * @param substraitFilter (Optional) Substrait filter expression. + * @param filter (Optional) Filter expression. Optional.empty() for no filter. + * @param limit (Optional) Maximum number of rows to return. + * @param offset (Optional) Number of rows to skip before returning results. + * @param withRowId Whether to include the row ID in the results. + * @param withRowAddress Whether to include the row address in the results. + * @param nearest (Optional) Nearest neighbor query. + * @param batchReadahead Number of batches to read ahead. + * @param columnOrderings (Optional) Column orderings for result sorting. + * @param useScalarIndex Whether to use scalar indices for the scan. Default is true. + * @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown. + * @param collectStats Whether to collect scan execution statistics. Default is false. + * @param fastSearch Whether to only search indexed fragments. Default is false. + * @param includeDeletedRows Whether to include deleted rows in scan results. Default is false. + * @param strictBatchSize Whether to enforce strict batch sizing. Default is false. + * @param disableScoringAutoprojection Whether to disable scoring column autoprojection. Default + * is false. + */ + public ScanOptions( + Optional> fragmentIds, + Optional> indexSegments, + Optional batchSize, + Optional> columns, + Optional filter, + Optional substraitFilter, + Optional limit, + Optional offset, + Optional nearest, + Optional fullTextQuery, + boolean prefilter, + boolean withRowId, + boolean withRowAddress, + int batchReadahead, + Optional> columnOrderings, + boolean useScalarIndex, + Optional substraitAggregate, + boolean collectStats, + boolean fastSearch, + boolean includeDeletedRows, + boolean strictBatchSize, + boolean disableScoringAutoprojection) { Preconditions.checkArgument( !(filter.isPresent() && substraitFilter.isPresent()), "cannot set both substrait filter and string filter"); this.fragmentIds = fragmentIds; + this.indexSegments = indexSegments; this.batchSize = batchSize; this.columns = columns; this.filter = filter; @@ -166,6 +251,15 @@ public Optional> getFragmentIds() { return fragmentIds; } + /** + * Get the index segment UUIDs. + * + * @return Optional containing the index segment UUIDs if specified, otherwise empty. + */ + public Optional> getIndexSegments() { + return indexSegments; + } + /** * Get the batch size. * @@ -340,6 +434,7 @@ public boolean isDisableScoringAutoprojection() { public String toString() { return MoreObjects.toStringHelper(this) .add("fragmentIds", fragmentIds.orElse(null)) + .add("indexSegments", indexSegments.orElse(null)) .add("batchSize", batchSize.orElse(null)) .add("columns", columns.orElse(null)) .add("filter", filter.orElse(null)) @@ -370,6 +465,7 @@ public String toString() { /** Builder for constructing LanceScanOptions. */ public static class Builder { private Optional> fragmentIds = Optional.empty(); + private Optional> indexSegments = Optional.empty(); private Optional batchSize = Optional.empty(); private Optional> columns = Optional.empty(); private Optional filter = Optional.empty(); @@ -400,6 +496,7 @@ public Builder() {} */ public Builder(ScanOptions options) { this.fragmentIds = options.getFragmentIds(); + this.indexSegments = options.getIndexSegments(); this.batchSize = options.getBatchSize(); this.columns = options.getColumns(); this.filter = options.getFilter(); @@ -433,6 +530,17 @@ public Builder fragmentIds(List fragmentIds) { return this; } + /** + * Set the index segment UUIDs to use for vector search. + * + * @param indexSegments the index segment UUIDs to use + * @return Builder instance for method chaining. + */ + public Builder indexSegments(List indexSegments) { + this.indexSegments = Optional.of(indexSegments); + return this; + } + /** * Set the batch size. * @@ -666,6 +774,7 @@ public Builder disableScoringAutoprojection(boolean disableScoringAutoprojection public ScanOptions build() { return new ScanOptions( fragmentIds, + indexSegments, batchSize, columns, filter,