Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/lance-jni/src/async_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local
_class: JObject<'local>,
jdataset: JObject<'local>,
fragment_ids_obj: JObject<'local>,
index_segments_obj: JObject<'local>,
columns_obj: JObject<'local>,
substrait_filter_obj: JObject<'local>,
filter_obj: JObject<'local>,
Expand All @@ -203,6 +204,7 @@ pub extern "system" fn Java_org_lance_ipc_AsyncScanner_createAsyncScanner<'local
&mut env,
jdataset,
fragment_ids_obj,
index_segments_obj,
columns_obj,
substrait_filter_obj,
filter_obj,
Expand Down Expand Up @@ -231,6 +233,7 @@ fn inner_create_async_scanner<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject<'local>,
fragment_ids_obj: JObject<'local>,
index_segments_obj: JObject<'local>,
columns_obj: JObject<'local>,
substrait_filter_obj: JObject<'local>,
filter_obj: JObject<'local>,
Expand Down Expand Up @@ -258,6 +261,7 @@ fn inner_create_async_scanner<'local>(

let options = ScannerOptions {
fragment_ids_obj,
index_segments_obj,
columns_obj,
substrait_filter_obj,
filter_obj,
Expand Down
51 changes: 32 additions & 19 deletions java/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::traits::{import_vec_from_method, import_vec_to_rust};
use crate::traits::{FromJObjectWithEnv, import_vec_from_method, import_vec_to_rust};
use arrow::array::Float32Array;
use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream};
use arrow_schema::SchemaRef;
Expand All @@ -25,6 +25,7 @@ use lance_index::scalar::inverted::query::{
};
use lance_io::ffi::to_ffi_arrow_array_stream;
use lance_linalg::distance::DistanceType;
use uuid::Uuid;

use crate::{
RT,
Expand Down Expand Up @@ -231,6 +232,7 @@ pub(crate) fn build_full_text_search_query<'a>(
/// Scanner options passed from JNI - shared between blocking and async scanners
pub(crate) struct ScannerOptions<'a> {
pub fragment_ids_obj: JObject<'a>,
pub index_segments_obj: JObject<'a>,
pub columns_obj: JObject<'a>,
pub substrait_filter_obj: JObject<'a>,
pub filter_obj: JObject<'a>,
Expand Down Expand Up @@ -275,6 +277,13 @@ pub(crate) fn build_scanner_with_options<'a>(
scanner.with_fragments(fragments);
}

env.get_optional(&options.index_segments_obj, |env, java_segments| {
let index_segments: Vec<Uuid> =
import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
scanner.with_index_segments(index_segments)?;
Ok(())
})?;

let columns_opt = env.get_strings_opt(&options.columns_obj)?;
if let Some(columns) = columns_opt {
scanner.project(&columns)?;
Expand Down Expand Up @@ -418,26 +427,27 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>(
mut env: JNIEnv<'local>,
_reader: JObject<'local>,
jdataset: JObject<'local>,
fragment_ids_obj: JObject<'local>, // Optional<List<Integer>>
columns_obj: JObject<'local>, // Optional<List<String>>
fragment_ids_obj: JObject<'local>, // Optional<List<Integer>>
index_segments_obj: JObject<'local>, // Optional<List<UUID>>
columns_obj: JObject<'local>, // Optional<List<String>>
substrait_filter_obj: JObject<'local>, // Optional<ByteBuffer>
filter_obj: JObject<'local>, // Optional<String>
batch_size_obj: JObject<'local>, // Optional<Long>
limit_obj: JObject<'local>, // Optional<Integer>
offset_obj: JObject<'local>, // Optional<Integer>
query_obj: JObject<'local>, // Optional<Query>
fts_query_obj: JObject<'local>, // Optional<FullTextQuery>
prefilter: jboolean, // boolean
with_row_id: jboolean, // boolean
with_row_address: jboolean, // boolean
batch_readahead: jint, // int
column_orderings: JObject<'local>, // Optional<List<ColumnOrdering>>
use_scalar_index: jboolean, // boolean
fast_search: jboolean, // boolean
filter_obj: JObject<'local>, // Optional<String>
batch_size_obj: JObject<'local>, // Optional<Long>
limit_obj: JObject<'local>, // Optional<Integer>
offset_obj: JObject<'local>, // Optional<Integer>
query_obj: JObject<'local>, // Optional<Query>
fts_query_obj: JObject<'local>, // Optional<FullTextQuery>
prefilter: jboolean, // boolean
with_row_id: jboolean, // boolean
with_row_address: jboolean, // boolean
batch_readahead: jint, // int
column_orderings: JObject<'local>, // Optional<List<ColumnOrdering>>
use_scalar_index: jboolean, // boolean
fast_search: jboolean, // boolean
substrait_aggregate_obj: JObject<'local>, // Optional<ByteBuffer>
collect_stats: jboolean, // boolean
include_deleted_rows: jboolean, // boolean
strict_batch_size: jboolean, // boolean
collect_stats: jboolean, // boolean
include_deleted_rows: jboolean, // boolean
strict_batch_size: jboolean, // boolean
disable_scoring_autoprojection: jboolean, // boolean
) -> JObject<'local> {
ok_or_throw!(
Expand All @@ -446,6 +456,7 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_createScanner<'local>(
&mut env,
jdataset,
fragment_ids_obj,
index_segments_obj,
columns_obj,
substrait_filter_obj,
filter_obj,
Expand Down Expand Up @@ -475,6 +486,7 @@ fn inner_create_scanner<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject<'local>,
fragment_ids_obj: JObject<'local>,
index_segments_obj: JObject<'local>,
columns_obj: JObject<'local>,
substrait_filter_obj: JObject<'local>,
filter_obj: JObject<'local>,
Expand Down Expand Up @@ -503,6 +515,7 @@ fn inner_create_scanner<'local>(

let options = ScannerOptions {
fragment_ids_obj,
index_segments_obj,
columns_obj,
substrait_filter_obj,
filter_obj,
Expand Down
3 changes: 3 additions & 0 deletions java/src/main/java/org/lance/ipc/AsyncScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -65,6 +66,7 @@ public static AsyncScanner create(
createAsyncScanner(
dataset,
options.getFragmentIds(),
options.getIndexSegments(),
options.getColumns(),
options.getSubstraitFilter(),
options.getFilter(),
Expand All @@ -91,6 +93,7 @@ public static AsyncScanner create(
static native AsyncScanner createAsyncScanner(
Dataset dataset,
Optional<List<Integer>> fragmentIds,
Optional<List<UUID>> indexSegments,
Optional<List<String>> columns,
Optional<ByteBuffer> substraitFilter,
Optional<String> filter,
Expand Down
3 changes: 3 additions & 0 deletions java/src/main/java/org/lance/ipc/LanceScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

/** Scanner over a Fragment. */
public class LanceScanner implements org.apache.arrow.dataset.scanner.Scanner {
Expand Down Expand Up @@ -61,6 +62,7 @@ public static LanceScanner create(
createScanner(
dataset,
options.getFragmentIds(),
options.getIndexSegments(),
options.getColumns(),
options.getSubstraitFilter(),
options.getFilter(),
Expand Down Expand Up @@ -90,6 +92,7 @@ public static LanceScanner create(
static native LanceScanner createScanner(
Dataset dataset,
Optional<List<Integer>> fragmentIds,
Optional<List<UUID>> indexSegments,
Optional<List<String>> columns,
Optional<ByteBuffer> substraitFilter,
Optional<String> filter,
Expand Down
109 changes: 109 additions & 0 deletions java/src/main/java/org/lance/ipc/ScanOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

/** Lance scan options. */
public class ScanOptions {
private final Optional<List<Integer>> fragmentIds;
private final Optional<List<UUID>> indexSegments;
private final Optional<Long> batchSize;
private final Optional<List<String>> columns;
private final Optional<String> filter;
Expand Down Expand Up @@ -64,6 +66,7 @@ public ScanOptions(
boolean collectStats) {
this(
fragmentIds,
Optional.empty(),
batchSize,
columns,
filter,
Expand Down Expand Up @@ -108,7 +111,10 @@ public ScanOptions(
* @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown.
* @param collectStats Whether to collect scan execution statistics. Default is false.
* @param fastSearch Whether to only search indexed fragments. Default is false.
* @deprecated Use the overload that adds {@code indexSegments} for vector index segment
* selection.
*/
@Deprecated
public ScanOptions(
Optional<List<Integer>> fragmentIds,
Optional<Long> batchSize,
Expand All @@ -131,10 +137,89 @@ public ScanOptions(
boolean includeDeletedRows,
boolean strictBatchSize,
boolean disableScoringAutoprojection) {
this(
fragmentIds,
Optional.empty(),
batchSize,
columns,
filter,
substraitFilter,
limit,
offset,
nearest,
fullTextQuery,
prefilter,
withRowId,
withRowAddress,
batchReadahead,
columnOrderings,
useScalarIndex,
substraitAggregate,
collectStats,
fastSearch,
includeDeletedRows,
strictBatchSize,
disableScoringAutoprojection);
}

/**
* Constructor for LanceScanOptions.
*
* @param fragmentIds the id of the fragments to scan
* @param indexSegments (Optional) Vector index segment UUIDs to restrict the search to. Only
* valid for nearest-neighbor search. Empty list is rejected by the engine. When combined with
* {@code fragmentIds}, fragments outside the selected segments still fall back to flat KNN.
* @param batchSize Maximum row number of each returned ArrowRecordBatch. Optional, use
* Optional.empty() if unspecified.
* @param columns (Optional) Projected columns. Optional.empty() for scanning all columns.
* Otherwise, only columns present in the List will be scanned.
* @param filter (Optional) Filter expression. Optional.empty() for no filter.
* @param substraitFilter (Optional) Substrait filter expression.
* @param filter (Optional) Filter expression. Optional.empty() for no filter.
* @param limit (Optional) Maximum number of rows to return.
* @param offset (Optional) Number of rows to skip before returning results.
* @param withRowId Whether to include the row ID in the results.
* @param withRowAddress Whether to include the row address in the results.
* @param nearest (Optional) Nearest neighbor query.
* @param batchReadahead Number of batches to read ahead.
* @param columnOrderings (Optional) Column orderings for result sorting.
* @param useScalarIndex Whether to use scalar indices for the scan. Default is true.
* @param substraitAggregate (Optional) Substrait aggregate expression for aggregate pushdown.
* @param collectStats Whether to collect scan execution statistics. Default is false.
* @param fastSearch Whether to only search indexed fragments. Default is false.
* @param includeDeletedRows Whether to include deleted rows in scan results. Default is false.
* @param strictBatchSize Whether to enforce strict batch sizing. Default is false.
* @param disableScoringAutoprojection Whether to disable scoring column autoprojection. Default
* is false.
*/
public ScanOptions(
Optional<List<Integer>> fragmentIds,
Optional<List<UUID>> indexSegments,
Optional<Long> batchSize,
Optional<List<String>> columns,
Optional<String> filter,
Optional<ByteBuffer> substraitFilter,
Optional<Long> limit,
Optional<Long> offset,
Optional<Query> nearest,
Optional<FullTextQuery> fullTextQuery,
boolean prefilter,
boolean withRowId,
boolean withRowAddress,
int batchReadahead,
Optional<List<ColumnOrdering>> columnOrderings,
boolean useScalarIndex,
Optional<ByteBuffer> substraitAggregate,
boolean collectStats,
boolean fastSearch,
boolean includeDeletedRows,
boolean strictBatchSize,
boolean disableScoringAutoprojection) {
Preconditions.checkArgument(
!(filter.isPresent() && substraitFilter.isPresent()),
"cannot set both substrait filter and string filter");
this.fragmentIds = fragmentIds;
this.indexSegments = indexSegments;
this.batchSize = batchSize;
this.columns = columns;
this.filter = filter;
Expand Down Expand Up @@ -166,6 +251,15 @@ public Optional<List<Integer>> getFragmentIds() {
return fragmentIds;
}

/**
* Get the index segment UUIDs.
*
* @return Optional containing the index segment UUIDs if specified, otherwise empty.
*/
public Optional<List<UUID>> getIndexSegments() {
return indexSegments;
}

/**
* Get the batch size.
*
Expand Down Expand Up @@ -340,6 +434,7 @@ public boolean isDisableScoringAutoprojection() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("fragmentIds", fragmentIds.orElse(null))
.add("indexSegments", indexSegments.orElse(null))
.add("batchSize", batchSize.orElse(null))
.add("columns", columns.orElse(null))
.add("filter", filter.orElse(null))
Expand Down Expand Up @@ -370,6 +465,7 @@ public String toString() {
/** Builder for constructing LanceScanOptions. */
public static class Builder {
private Optional<List<Integer>> fragmentIds = Optional.empty();
private Optional<List<UUID>> indexSegments = Optional.empty();
private Optional<Long> batchSize = Optional.empty();
private Optional<List<String>> columns = Optional.empty();
private Optional<String> filter = Optional.empty();
Expand Down Expand Up @@ -400,6 +496,7 @@ public Builder() {}
*/
public Builder(ScanOptions options) {
this.fragmentIds = options.getFragmentIds();
this.indexSegments = options.getIndexSegments();
this.batchSize = options.getBatchSize();
this.columns = options.getColumns();
this.filter = options.getFilter();
Expand Down Expand Up @@ -433,6 +530,17 @@ public Builder fragmentIds(List<Integer> fragmentIds) {
return this;
}

/**
* Set the index segment UUIDs to use for vector search.
*
* @param indexSegments the index segment UUIDs to use
* @return Builder instance for method chaining.
*/
public Builder indexSegments(List<UUID> indexSegments) {
this.indexSegments = Optional.of(indexSegments);
return this;
}

/**
* Set the batch size.
*
Expand Down Expand Up @@ -666,6 +774,7 @@ public Builder disableScoringAutoprojection(boolean disableScoringAutoprojection
public ScanOptions build() {
return new ScanOptions(
fragmentIds,
indexSegments,
batchSize,
columns,
filter,
Expand Down
Loading