diff --git a/java/lance-jni/src/async_scanner.rs b/java/lance-jni/src/async_scanner.rs index 6da10479266..8a4328dc214 100644 --- a/java/lance-jni/src/async_scanner.rs +++ b/java/lance-jni/src/async_scanner.rs @@ -177,6 +177,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local _class: JObject<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -203,6 +204,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local &mut env, jdataset, fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, @@ -231,6 +233,7 @@ fn inner_create_async_scanner<'local>( env: &mut JNIEnv<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -258,6 +261,7 @@ fn inner_create_async_scanner<'local>( let options = ScannerOptions { fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, diff --git a/java/lance-jni/src/blocking_scanner.rs b/java/lance-jni/src/blocking_scanner.rs index 700dea2f944..bb8e62a19b2 100644 --- a/java/lance-jni/src/blocking_scanner.rs +++ b/java/lance-jni/src/blocking_scanner.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex}; use crate::error::{Error, Result}; use crate::ffi::JNIEnvExt; -use crate::traits::{import_vec_from_method, import_vec_to_rust}; +use crate::traits::{FromJObjectWithEnv, import_vec_from_method, import_vec_to_rust}; use arrow::array::Float32Array; use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream}; use arrow_schema::SchemaRef; @@ -25,6 +25,7 @@ use lance_index::scalar::inverted::query::{ }; use lance_io::ffi::to_ffi_arrow_array_stream; use lance_linalg::distance::DistanceType; +use uuid::Uuid; use crate::{ RT, @@ -231,6 +232,7 @@ pub(crate) fn build_full_text_search_query<'a>( /// Scanner options passed from JNI - shared between blocking and async scanners pub(crate) struct ScannerOptions<'a> { pub fragment_ids_obj: JObject<'a>, + pub index_segments_obj: JObject<'a>, pub columns_obj: JObject<'a>, pub substrait_filter_obj: JObject<'a>, pub filter_obj: JObject<'a>, @@ -275,6 +277,13 @@ pub(crate) fn build_scanner_with_options<'a>( scanner.with_fragments(fragments); } + env.get_optional(&options.index_segments_obj, |env, java_segments| { + let index_segments: Vec = + import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + scanner.with_index_segments(index_segments)?; + Ok(()) + })?; + let columns_opt = env.get_strings_opt(&options.columns_obj)?; if let Some(columns) = columns_opt { scanner.project(&columns)?; @@ -418,26 +427,27 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>( mut env: JNIEnv<'local>, _reader: JObject<'local>, jdataset: JObject<'local>, - fragment_ids_obj: JObject<'local>, // Optional> - columns_obj: JObject<'local>, // Optional> + fragment_ids_obj: JObject<'local>, // Optional> + index_segments_obj: JObject<'local>, // Optional> + columns_obj: JObject<'local>, // Optional> substrait_filter_obj: JObject<'local>, // Optional - filter_obj: JObject<'local>, // Optional - batch_size_obj: JObject<'local>, // Optional - limit_obj: JObject<'local>, // Optional - offset_obj: JObject<'local>, // Optional - query_obj: JObject<'local>, // Optional - fts_query_obj: JObject<'local>, // Optional - prefilter: jboolean, // boolean - with_row_id: jboolean, // boolean - with_row_address: jboolean, // boolean - batch_readahead: jint, // int - column_orderings: JObject<'local>, // Optional> - use_scalar_index: jboolean, // boolean - fast_search: jboolean, // boolean + filter_obj: JObject<'local>, // Optional + batch_size_obj: JObject<'local>, // Optional + limit_obj: JObject<'local>, // Optional + offset_obj: JObject<'local>, // Optional + query_obj: JObject<'local>, // Optional + fts_query_obj: JObject<'local>, // Optional + prefilter: jboolean, // boolean + with_row_id: jboolean, // boolean + with_row_address: jboolean, // boolean + batch_readahead: jint, // int + column_orderings: JObject<'local>, // Optional> + use_scalar_index: jboolean, // boolean + fast_search: jboolean, // boolean substrait_aggregate_obj: JObject<'local>, // Optional - collect_stats: jboolean, // boolean - include_deleted_rows: jboolean, // boolean - strict_batch_size: jboolean, // boolean + collect_stats: jboolean, // boolean + include_deleted_rows: jboolean, // boolean + strict_batch_size: jboolean, // boolean disable_scoring_autoprojection: jboolean, // boolean ) -> JObject<'local> { ok_or_throw!( @@ -446,6 +456,7 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>( &mut env, jdataset, fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, @@ -475,6 +486,7 @@ fn inner_create_scanner<'local>( env: &mut JNIEnv<'local>, jdataset: JObject<'local>, fragment_ids_obj: JObject<'local>, + index_segments_obj: JObject<'local>, columns_obj: JObject<'local>, substrait_filter_obj: JObject<'local>, filter_obj: JObject<'local>, @@ -503,6 +515,7 @@ fn inner_create_scanner<'local>( let options = ScannerOptions { fragment_ids_obj, + index_segments_obj, columns_obj, substrait_filter_obj, filter_obj, diff --git a/java/src/main/java/org/lance/ipc/AsyncScanner.java b/java/src/main/java/org/lance/ipc/AsyncScanner.java index 6e515e3546c..e241682719a 100644 --- a/java/src/main/java/org/lance/ipc/AsyncScanner.java +++ b/java/src/main/java/org/lance/ipc/AsyncScanner.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -65,6 +66,7 @@ public static AsyncScanner create( createAsyncScanner( dataset, options.getFragmentIds(), + options.getIndexSegments(), options.getColumns(), options.getSubstraitFilter(), options.getFilter(), @@ -91,6 +93,7 @@ public static AsyncScanner create( static native AsyncScanner createAsyncScanner( Dataset dataset, Optional> fragmentIds, + Optional> indexSegments, Optional> columns, Optional substraitFilter, Optional filter, diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 3a413e0ccfd..f541625e9b7 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; /** Scanner over a Fragment. */ public class LanceScanner implements org.apache.arrow.dataset.scanner.Scanner { @@ -61,6 +62,7 @@ public static LanceScanner create( createScanner( dataset, options.getFragmentIds(), + options.getIndexSegments(), options.getColumns(), options.getSubstraitFilter(), options.getFilter(), @@ -90,6 +92,7 @@ public static LanceScanner create( static native LanceScanner createScanner( Dataset dataset, Optional> fragmentIds, + Optional> indexSegments, Optional> columns, Optional substraitFilter, Optional filter, diff --git a/java/src/main/java/org/lance/ipc/ScanOptions.java b/java/src/main/java/org/lance/ipc/ScanOptions.java index a9aad590c2b..a1d18631f74 100644 --- a/java/src/main/java/org/lance/ipc/ScanOptions.java +++ b/java/src/main/java/org/lance/ipc/ScanOptions.java @@ -19,10 +19,12 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Optional; +import java.util.UUID; /** Lance scan options. */ public class ScanOptions { private final Optional> fragmentIds; + private final Optional> indexSegments; private final Optional batchSize; private final Optional> columns; private final Optional filter; @@ -64,6 +66,7 @@ public ScanOptions( boolean collectStats) { this( fragmentIds, + Optional.empty(), batchSize, columns, filter, @@ -108,7 +111,10 @@ public ScanOptions( * @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown. * @param collectStats Whether to collect scan execution statistics. Default is false. * @param fastSearch Whether to only search indexed fragments. Default is false. + * @deprecated Use the overload that adds {@code indexSegments} for vector index segment + * selection. */ + @Deprecated public ScanOptions( Optional> fragmentIds, Optional batchSize, @@ -131,10 +137,89 @@ public ScanOptions( boolean includeDeletedRows, boolean strictBatchSize, boolean disableScoringAutoprojection) { + this( + fragmentIds, + Optional.empty(), + batchSize, + columns, + filter, + substraitFilter, + limit, + offset, + nearest, + fullTextQuery, + prefilter, + withRowId, + withRowAddress, + batchReadahead, + columnOrderings, + useScalarIndex, + substraitAggregate, + collectStats, + fastSearch, + includeDeletedRows, + strictBatchSize, + disableScoringAutoprojection); + } + + /** + * Constructor for LanceScanOptions. + * + * @param fragmentIds the id of the fragments to scan + * @param indexSegments (Optional) Vector index segment UUIDs to restrict the search to. Only + * valid for nearest-neighbor search. Empty list is rejected by the engine. When combined with + * {@code fragmentIds}, fragments outside the selected segments still fall back to flat KNN. + * @param batchSize Maximum row number of each returned ArrowRecordBatch. Optional, use + * Optional.empty() if unspecified. + * @param columns (Optional) Projected columns. Optional.empty() for scanning all columns. + * Otherwise, only columns present in the List will be scanned. + * @param filter (Optional) Filter expression. Optional.empty() for no filter. + * @param substraitFilter (Optional) Substrait filter expression. + * @param filter (Optional) Filter expression. Optional.empty() for no filter. + * @param limit (Optional) Maximum number of rows to return. + * @param offset (Optional) Number of rows to skip before returning results. + * @param withRowId Whether to include the row ID in the results. + * @param withRowAddress Whether to include the row address in the results. + * @param nearest (Optional) Nearest neighbor query. + * @param batchReadahead Number of batches to read ahead. + * @param columnOrderings (Optional) Column orderings for result sorting. + * @param useScalarIndex Whether to use scalar indices for the scan. Default is true. + * @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown. + * @param collectStats Whether to collect scan execution statistics. Default is false. + * @param fastSearch Whether to only search indexed fragments. Default is false. + * @param includeDeletedRows Whether to include deleted rows in scan results. Default is false. + * @param strictBatchSize Whether to enforce strict batch sizing. Default is false. + * @param disableScoringAutoprojection Whether to disable scoring column autoprojection. Default + * is false. + */ + public ScanOptions( + Optional> fragmentIds, + Optional> indexSegments, + Optional batchSize, + Optional> columns, + Optional filter, + Optional substraitFilter, + Optional limit, + Optional offset, + Optional nearest, + Optional fullTextQuery, + boolean prefilter, + boolean withRowId, + boolean withRowAddress, + int batchReadahead, + Optional> columnOrderings, + boolean useScalarIndex, + Optional substraitAggregate, + boolean collectStats, + boolean fastSearch, + boolean includeDeletedRows, + boolean strictBatchSize, + boolean disableScoringAutoprojection) { Preconditions.checkArgument( !(filter.isPresent() && substraitFilter.isPresent()), "cannot set both substrait filter and string filter"); this.fragmentIds = fragmentIds; + this.indexSegments = indexSegments; this.batchSize = batchSize; this.columns = columns; this.filter = filter; @@ -166,6 +251,15 @@ public Optional> getFragmentIds() { return fragmentIds; } + /** + * Get the index segment UUIDs. + * + * @return Optional containing the index segment UUIDs if specified, otherwise empty. + */ + public Optional> getIndexSegments() { + return indexSegments; + } + /** * Get the batch size. * @@ -340,6 +434,7 @@ public boolean isDisableScoringAutoprojection() { public String toString() { return MoreObjects.toStringHelper(this) .add("fragmentIds", fragmentIds.orElse(null)) + .add("indexSegments", indexSegments.orElse(null)) .add("batchSize", batchSize.orElse(null)) .add("columns", columns.orElse(null)) .add("filter", filter.orElse(null)) @@ -370,6 +465,7 @@ public String toString() { /** Builder for constructing LanceScanOptions. */ public static class Builder { private Optional> fragmentIds = Optional.empty(); + private Optional> indexSegments = Optional.empty(); private Optional batchSize = Optional.empty(); private Optional> columns = Optional.empty(); private Optional filter = Optional.empty(); @@ -400,6 +496,7 @@ public Builder() {} */ public Builder(ScanOptions options) { this.fragmentIds = options.getFragmentIds(); + this.indexSegments = options.getIndexSegments(); this.batchSize = options.getBatchSize(); this.columns = options.getColumns(); this.filter = options.getFilter(); @@ -433,6 +530,17 @@ public Builder fragmentIds(List fragmentIds) { return this; } + /** + * Set the index segment UUIDs to use for vector search. + * + * @param indexSegments the index segment UUIDs to use + * @return Builder instance for method chaining. + */ + public Builder indexSegments(List indexSegments) { + this.indexSegments = Optional.of(indexSegments); + return this; + } + /** * Set the batch size. * @@ -666,6 +774,7 @@ public Builder disableScoringAutoprojection(boolean disableScoringAutoprojection public ScanOptions build() { return new ScanOptions( fragmentIds, + indexSegments, batchSize, columns, filter,