From 4a02bf471aa7d862cfe0e550196e2c9d8f2dcf2b Mon Sep 17 00:00:00 2001 From: zhoubin11 Date: Wed, 10 Jun 2026 18:02:53 +0800 Subject: [PATCH] feat: support distributed VECTOR_SEARCH Change-Id: I94c3cd431bcf5ba4bee7906838fa2d7cd4f6769e --- .../SparkDistributedVectorSearchTest.java | 16 + .../SparkDistributedVectorSearchTest.java | 16 + .../spark/internal/LanceFragmentScanner.java | 46 ++- ...ceMergedSearchColumnarPartitionReader.java | 221 +++++++++++++ .../LanceSearchColumnarPartitionReader.java | 2 +- .../search/LanceSearchInputPartition.java | 98 ++++++ .../LanceSearchPartitionReaderFactory.java | 8 +- .../lance/spark/search/LanceSearchQuery.java | 76 +++++ .../search/LanceSearchRowPartitionReader.java | 4 +- .../lance/spark/search/LanceSearchScan.java | 293 +++++++++++++++++- .../spark/search/LanceSearchScanBuilder.java | 32 +- .../lance/spark/search/LanceSearchTable.java | 42 ++- .../search/LanceSearchTableFunctions.scala | 70 ++++- .../BaseSparkDistributedVectorSearchTest.java | 195 ++++++++++++ 14 files changed, 1092 insertions(+), 27 deletions(-) create mode 100644 lance-spark-3.4_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java create mode 100644 lance-spark-3.5_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceMergedSearchColumnarPartitionReader.java create mode 100644 lance-spark-base_2.12/src/test/java/org/lance/spark/search/BaseSparkDistributedVectorSearchTest.java diff --git a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java new file mode 100644 index 000000000..8cc47f00c --- /dev/null +++ b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java @@ -0,0 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.search; + +public class SparkDistributedVectorSearchTest extends BaseSparkDistributedVectorSearchTest {} diff --git a/lance-spark-3.5_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java new file mode 100644 index 000000000..8cc47f00c --- /dev/null +++ b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/search/SparkDistributedVectorSearchTest.java @@ -0,0 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.search; + +public class SparkDistributedVectorSearchTest extends BaseSparkDistributedVectorSearchTest {} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java index c2cb39a4c..554be577c 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -79,20 +80,13 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in LanceScanner lanceScanner = null; try { LanceSparkReadOptions readOptions = inputPartition.getReadOptions(); - if (inputPartition.getNamespaceImpl() != null && readOptions.isExecutorCredentialRefresh()) { - if (LanceRuntime.useNamespaceOnWorkers(inputPartition.getNamespaceImpl())) { - readOptions.setNamespace( - LanceRuntime.getOrCreateNamespace( - inputPartition.getNamespaceImpl(), inputPartition.getNamespaceProperties())); - } else { - readOptions.setNamespace(null); - } - } long dsOpenStart = System.nanoTime(); dataset = - Utils.openDatasetBuilder(readOptions) - .initialStorageOptions(inputPartition.getInitialStorageOptions()) - .build(); + openDatasetForWorker( + readOptions, + inputPartition.getNamespaceImpl(), + inputPartition.getNamespaceProperties(), + inputPartition.getInitialStorageOptions()); long dsOpenTimeNs = System.nanoTime() - dsOpenStart; Fragment fragment = dataset.getFragment(fragmentId); if (fragment == null) { @@ -171,6 +165,34 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in } } + /** + * Open a Lance dataset on a worker, reusing the same credential-refresh logic that {@link + * #create(int, LanceInputPartition)} applies. Used by both the read scan path (via {@code + * create}) and the distributed search worker. + * + * @param readOptions read options carrying URI, version, and (optionally) namespace info + * @param namespaceImpl namespace implementation type, may be null + * @param namespaceProperties namespace properties, may be null + * @param initialStorageOptions storage options vended by the driver, may be null + */ + public static Dataset openDatasetForWorker( + LanceSparkReadOptions readOptions, + String namespaceImpl, + Map namespaceProperties, + Map initialStorageOptions) { + if (namespaceImpl != null && readOptions.isExecutorCredentialRefresh()) { + if (LanceRuntime.useNamespaceOnWorkers(namespaceImpl)) { + readOptions.setNamespace( + LanceRuntime.getOrCreateNamespace(namespaceImpl, namespaceProperties)); + } else { + readOptions.setNamespace(null); + } + } + return Utils.openDatasetBuilder(readOptions) + .initialStorageOptions(initialStorageOptions) + .build(); + } + /** * @return the arrow reader. The caller is responsible for closing the reader */ diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceMergedSearchColumnarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceMergedSearchColumnarPartitionReader.java new file mode 100644 index 000000000..fc4771e4b --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceMergedSearchColumnarPartitionReader.java @@ -0,0 +1,221 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.search; + +import org.lance.Dataset; +import org.lance.index.DistanceType; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.Query; +import org.lance.ipc.ScanOptions; +import org.lance.spark.LanceConstant; +import org.lance.spark.internal.LanceFragmentScanner; + +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +/** + * Worker-side partition reader for distributed VECTOR_SEARCH. Opens the Lance dataset locally and + * runs one of: + * + *
    + *
  • indexed unit ({@code indexSegments} non-empty) → {@code ScanOptions.indexSegments(...)} + *
  • fallback unit ({@code fragmentIds} non-empty) → {@code ScanOptions.fragmentIds(...)} + *
+ * + * and iterates Arrow batches into Spark {@link ColumnarBatch}. + */ +public class LanceMergedSearchColumnarPartitionReader implements PartitionReader { + private final LanceSearchInputPartition partition; + private Dataset dataset; + private LanceScanner scanner; + private ArrowReader reader; + private ColumnarBatch currentBatch; + private boolean finished; + + public LanceMergedSearchColumnarPartitionReader(LanceSearchInputPartition partition) { + this.partition = partition; + } + + @Override + public boolean next() throws IOException { + if (finished) { + return false; + } + if (reader == null) { + openReader(); + } + if (reader.loadNextBatch()) { + currentBatch = + LanceSearchColumnarPartitionReader.toColumnarBatch( + reader.getVectorSchemaRoot(), partition.getSchema()); + return true; + } + finished = true; + return false; + } + + @Override + public ColumnarBatch get() { + return currentBatch; + } + + @Override + public void close() throws IOException { + Throwable first = null; + if (currentBatch != null) { + try { + currentBatch.close(); + } catch (Throwable t) { + first = t; + } + } + first = closeQuietly(reader, first); + first = closeQuietly(scanner, first); + first = closeQuietly(dataset, first); + if (first != null) { + throw new IOException("Failed to close LanceMergedSearchColumnarPartitionReader", first); + } + } + + private static Throwable closeQuietly(AutoCloseable closeable, Throwable carried) { + if (closeable == null) { + return carried; + } + try { + closeable.close(); + return carried; + } catch (Throwable t) { + return carried == null ? t : carried; + } + } + + private void openReader() throws IOException { + dataset = + LanceFragmentScanner.openDatasetForWorker( + partition.getReadOptions(), + partition.getNamespaceImpl(), + partition.getNamespaceProperties(), + partition.getInitialStorageOptions()); + ScanOptions opts = buildScanOptions(partition); + try { + scanner = dataset.newScan(opts); + reader = scanner.scanBatches(); + } catch (Exception e) { + throw new IOException( + "Failed to open distributed search scan for partition: " + describe(partition), e); + } + } + + private static ScanOptions buildScanOptions(LanceSearchInputPartition p) { + LanceSearchQuery base = p.getQuery(); + String column = base.getVectorColumn(); + if (column == null || column.isEmpty()) { + throw new IllegalStateException( + "vector column must be resolved on the driver before scheduling worker tasks"); + } + + Query.Builder q = + new Query.Builder() + .setColumn(column) + .setKey(toFloatArray(base.getVector())) + .setK(base.getK()); + if (base.getDistanceType() != null && !base.getDistanceType().isEmpty()) { + q.setDistanceType(parseDistanceType(base.getDistanceType())); + } + if (base.getNprobes() != null) { + q.setMinimumNprobes(base.getNprobes()); + } + if (base.getEf() != null) { + q.setEf(base.getEf()); + } + if (base.getRefineFactor() != null) { + q.setRefineFactor(base.getRefineFactor()); + } + + ScanOptions.Builder b = new ScanOptions.Builder().nearest(q.build()); + boolean fallbackUnit = p.getIndexSegments().isEmpty(); + boolean userRequestedPrefilter = Boolean.TRUE.equals(base.getPrefilter()); + // Lance's Scanner::nearest rejects fragment-restricted scans unless prefilter=true (a + // prefilter expression supplies the per-fragment limit). The lance-core JNI silently + // drops the nearest() error, leaving us with a non-vector scan and no `_distance` + // column. Force prefilter on fallback units so nearest+fragmentIds coexist. + // TODO: revisit once Lance JNI propagates Scanner::nearest errors and lifts this restriction. + if (userRequestedPrefilter || fallbackUnit) { + b.prefilter(true); + } + if (base.getFilter() != null) { + b.filter(base.getFilter()); + } + if (!base.getOutputColumns().isEmpty()) { + b.columns(base.getOutputColumns()); + } + if (Boolean.TRUE.equals(base.getWithRowId()) || schemaHasRowId(p.getSchema())) { + b.withRowId(true); + } + + if (!p.getIndexSegments().isEmpty()) { + b.indexSegments(p.getIndexSegments()); + } else { + b.fragmentIds(p.getFragmentIds()); + } + return b.build(); + } + + private static boolean schemaHasRowId(StructType schema) { + for (StructField field : schema.fields()) { + if (field.name().equals(LanceConstant.ROW_ID)) { + return true; + } + } + return false; + } + + private static DistanceType parseDistanceType(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "l2": + case "euclidean": + return DistanceType.L2; + case "cosine": + return DistanceType.Cosine; + case "dot": + return DistanceType.Dot; + case "hamming": + return DistanceType.Hamming; + default: + throw new IllegalArgumentException("Unsupported distance_type: " + name); + } + } + + private static float[] toFloatArray(List vec) { + float[] arr = new float[vec.size()]; + for (int i = 0; i < arr.length; i++) { + arr[i] = vec.get(i); + } + return arr; + } + + private static String describe(LanceSearchInputPartition p) { + if (!p.getIndexSegments().isEmpty()) { + return "indexed segments=" + p.getIndexSegments(); + } + return "fallback fragments=" + p.getFragmentIds(); + } +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchColumnarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchColumnarPartitionReader.java index 80c30a344..d340717da 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchColumnarPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchColumnarPartitionReader.java @@ -95,7 +95,7 @@ private void openArrowReader() throws IOException { } } - private ColumnarBatch toColumnarBatch(VectorSchemaRoot root, StructType schema) { + static ColumnarBatch toColumnarBatch(VectorSchemaRoot root, StructType schema) { Map actualFields = new HashMap<>(); for (FieldVector vector : root.getFieldVectors()) { actualFields.put(vector.getField().getName(), vector); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchInputPartition.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchInputPartition.java index 546c371ea..08b9b0259 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchInputPartition.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchInputPartition.java @@ -13,18 +13,88 @@ */ package org.lance.spark.search; +import org.lance.spark.LanceSparkReadOptions; + import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.types.StructType; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + public class LanceSearchInputPartition implements InputPartition { private static final long serialVersionUID = -38612098237192389L; private final StructType schema; private final LanceSearchQuery query; + private final boolean distributed; + private final List fragmentIds; + private final List indexSegments; + private final LanceSparkReadOptions readOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final Map initialStorageOptions; + + /** Non-distributed: namespace.queryTable() path. */ public LanceSearchInputPartition(StructType schema, LanceSearchQuery query) { this.schema = schema; this.query = query; + this.distributed = false; + this.fragmentIds = Collections.emptyList(); + this.indexSegments = Collections.emptyList(); + this.readOptions = null; + this.namespaceImpl = null; + this.namespaceProperties = Collections.emptyMap(); + this.initialStorageOptions = Collections.emptyMap(); + } + + /** + * Distributed: one unit (= one segment OR one uncovered fragment) executed on a Spark task. + * Exactly one of {@code fragmentIds}, {@code indexSegments} must be non-empty. + */ + public LanceSearchInputPartition( + StructType schema, + LanceSearchQuery query, + List fragmentIds, + List indexSegments, + LanceSparkReadOptions readOptions, + String namespaceImpl, + Map namespaceProperties, + Map initialStorageOptions) { + if (readOptions == null) { + throw new IllegalArgumentException("readOptions must be non-null in distributed mode"); + } + if (namespaceImpl == null) { + throw new IllegalArgumentException("namespaceImpl must be non-null in distributed mode"); + } + boolean hasFragments = fragmentIds != null && !fragmentIds.isEmpty(); + boolean hasSegments = indexSegments != null && !indexSegments.isEmpty(); + if (hasFragments == hasSegments) { + throw new IllegalArgumentException( + "Exactly one of fragmentIds or indexSegments must be non-empty"); + } + this.schema = schema; + this.query = query; + this.distributed = true; + this.fragmentIds = + fragmentIds == null ? Collections.emptyList() : Collections.unmodifiableList(fragmentIds); + this.indexSegments = + indexSegments == null + ? Collections.emptyList() + : Collections.unmodifiableList(indexSegments); + this.readOptions = readOptions; + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = + namespaceProperties == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new HashMap<>(namespaceProperties)); + this.initialStorageOptions = + initialStorageOptions == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new HashMap<>(initialStorageOptions)); } public StructType getSchema() { @@ -34,4 +104,32 @@ public StructType getSchema() { public LanceSearchQuery getQuery() { return query; } + + public boolean isDistributed() { + return distributed; + } + + public List getFragmentIds() { + return fragmentIds; + } + + public List getIndexSegments() { + return indexSegments; + } + + public LanceSparkReadOptions getReadOptions() { + return readOptions; + } + + public String getNamespaceImpl() { + return namespaceImpl; + } + + public Map getNamespaceProperties() { + return namespaceProperties; + } + + public Map getInitialStorageOptions() { + return initialStorageOptions; + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchPartitionReaderFactory.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchPartitionReaderFactory.java index fb8c3d047..f5bd36f25 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchPartitionReaderFactory.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchPartitionReaderFactory.java @@ -24,13 +24,15 @@ public class LanceSearchPartitionReaderFactory implements PartitionReaderFactory @Override public PartitionReader createReader(InputPartition partition) { - return new LanceSearchRowPartitionReader( - new LanceSearchColumnarPartitionReader(asSearchPartition(partition))); + return new LanceSearchRowPartitionReader(createColumnarReader(partition)); } @Override public PartitionReader createColumnarReader(InputPartition partition) { - return new LanceSearchColumnarPartitionReader(asSearchPartition(partition)); + LanceSearchInputPartition p = asSearchPartition(partition); + return p.isDistributed() + ? new LanceMergedSearchColumnarPartitionReader(p) + : new LanceSearchColumnarPartitionReader(p); } @Override diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchQuery.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchQuery.java index 5c61f5c74..bd791cb76 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchQuery.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchQuery.java @@ -104,6 +104,82 @@ public Map getNamespaceProperties() { return namespaceProperties; } + public List getOutputColumns() { + return outputColumns; + } + + public Integer getK() { + return k; + } + + public Integer getOffset() { + return offset; + } + + public Long getVersion() { + return version; + } + + public String getFilter() { + return filter; + } + + public Boolean getWithRowId() { + return withRowId; + } + + public List getVector() { + return vector; + } + + public String getVectorColumn() { + return vectorColumn; + } + + public String getDistanceType() { + return distanceType; + } + + public Integer getNprobes() { + return nprobes; + } + + public Integer getEf() { + return ef; + } + + public Integer getRefineFactor() { + return refineFactor; + } + + public Float getLowerBound() { + return lowerBound; + } + + public Float getUpperBound() { + return upperBound; + } + + public Boolean getBypassVectorIndex() { + return bypassVectorIndex; + } + + public Boolean getFastSearch() { + return fastSearch; + } + + public Boolean getPrefilter() { + return prefilter; + } + + public String getTextQuery() { + return textQuery; + } + + public List getSearchColumns() { + return searchColumns; + } + public QueryTableRequest toQueryTableRequest() { QueryTableRequest request = new QueryTableRequest().id(tableId).k(k); request.vector(new QueryTableRequestVector()); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchRowPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchRowPartitionReader.java index cb5a7e40d..9e12682f2 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchRowPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchRowPartitionReader.java @@ -22,11 +22,11 @@ import java.util.Iterator; public class LanceSearchRowPartitionReader implements PartitionReader { - private final LanceSearchColumnarPartitionReader reader; + private final PartitionReader reader; private Iterator currentRows; private InternalRow currentRecord; - public LanceSearchRowPartitionReader(LanceSearchColumnarPartitionReader reader) { + public LanceSearchRowPartitionReader(PartitionReader reader) { this.reader = reader; } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScan.java index 75ad8d1c7..f7395b7b9 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScan.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScan.java @@ -13,23 +13,79 @@ */ package org.lance.spark.search; +import org.lance.Dataset; +import org.lance.Fragment; +import org.lance.index.Index; +import org.lance.index.IndexCriteria; +import org.lance.index.IndexDescription; +import org.lance.schema.LanceField; +import org.lance.spark.LanceSparkReadOptions; +import org.lance.spark.search.LanceSearchQuery.SearchType; +import org.lance.spark.utils.Utils; + +import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; public class LanceSearchScan implements Scan, Batch, Serializable { private static final long serialVersionUID = -120398471239847123L; + private static final Logger LOG = LoggerFactory.getLogger(LanceSearchScan.class); + private static final Set VECTOR_INDEX_TYPES = + new HashSet<>( + Arrays.asList( + "VECTOR", + "IVF_FLAT", + "IVF_PQ", + "IVF_SQ", + "IVF_HNSW_FLAT", + "IVF_HNSW_SQ", + "IVF_HNSW_PQ", + "IVF_RQ")); private final StructType schema; private final LanceSearchQuery query; + private final boolean distributed; + private final LanceSparkReadOptions readOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final Map initialStorageOptions; - public LanceSearchScan(StructType schema, LanceSearchQuery query) { + public LanceSearchScan( + StructType schema, + LanceSearchQuery query, + boolean distributed, + LanceSparkReadOptions readOptions, + String namespaceImpl, + Map namespaceProperties, + Map initialStorageOptions) { this.schema = schema; this.query = query; + this.distributed = distributed; + this.readOptions = readOptions; + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = + namespaceProperties == null ? Collections.emptyMap() : namespaceProperties; + this.initialStorageOptions = + initialStorageOptions == null ? Collections.emptyMap() : initialStorageOptions; } @Override @@ -49,11 +105,244 @@ public Batch toBatch() { @Override public InputPartition[] planInputPartitions() { - return new InputPartition[] {new LanceSearchInputPartition(schema, query)}; + if (!distributed) { + return new InputPartition[] {new LanceSearchInputPartition(schema, query)}; + } + return planDistributed(); } @Override public PartitionReaderFactory createReaderFactory() { return new LanceSearchPartitionReaderFactory(); } + + private InputPartition[] planDistributed() { + Dataset dataset = + Utils.openDatasetBuilder(readOptions).initialStorageOptions(initialStorageOptions).build(); + try { + Set existingFragments = new HashSet<>(); + for (Fragment fragment : dataset.getFragments()) { + existingFragments.add(fragment.getId()); + } + if (existingFragments.isEmpty()) { + LOG.info("Lance distributed vector search: empty dataset, returning empty result"); + return new InputPartition[0]; + } + + String column = resolveVectorColumn(dataset); + Optional vectorIndex = selectVectorIndex(dataset, column); + boolean fastSearch = Boolean.TRUE.equals(query.getFastSearch()); + + List units = planUnits(existingFragments, vectorIndex, fastSearch); + + long indexedCount = units.stream().filter(u -> !u.indexSegments.isEmpty()).count(); + LOG.info( + "Lance distributed vector search: column={}, indexName={}, units={} " + + "(indexed={}, fallback={}), candidateK={}", + column, + vectorIndex.map(VectorIndexInfo::getIndexName).orElse("none"), + units.size(), + indexedCount, + units.size() - indexedCount, + query.getK()); + + if (units.isEmpty()) { + return new InputPartition[0]; + } + + LanceSearchQuery resolvedQuery = withResolvedVectorColumn(query, column); + InputPartition[] result = new InputPartition[units.size()]; + for (int i = 0; i < units.size(); i++) { + PlannedUnit u = units.get(i); + result[i] = + new LanceSearchInputPartition( + schema, + resolvedQuery, + u.fragmentIds, + u.indexSegments, + readOptions, + namespaceImpl, + namespaceProperties, + initialStorageOptions); + } + return result; + } finally { + dataset.close(); + } + } + + private static List planUnits( + Set existingFragments, Optional vectorIndex, boolean fastSearch) { + if (existingFragments.isEmpty()) { + return Collections.emptyList(); + } + List units = new ArrayList<>(); + Set indexedFragments = new HashSet<>(); + if (vectorIndex.isPresent()) { + for (VectorIndexSegment segment : vectorIndex.get().getSegments()) { + Set covered = new HashSet<>(segment.getFragmentIds()); + covered.retainAll(existingFragments); + if (covered.isEmpty()) { + continue; + } + indexedFragments.addAll(covered); + units.add(PlannedUnit.indexed(segment.getUuid())); + } + } + if (!fastSearch) { + Set uncovered = new TreeSet<>(existingFragments); + uncovered.removeAll(indexedFragments); + for (Integer fragmentId : uncovered) { + units.add(PlannedUnit.fallback(fragmentId)); + } + } + return units; + } + + private String resolveVectorColumn(Dataset dataset) { + String declared = query.getVectorColumn(); + if (declared != null && !declared.isEmpty()) { + return declared; + } + for (LanceField field : dataset.getLanceSchema().fields()) { + if (field.getType() instanceof ArrowType.FixedSizeList) { + return field.getName(); + } + } + throw new IllegalArgumentException( + "VECTOR_SEARCH could not auto-detect a vector column; pass vector_column explicitly"); + } + + private static Optional selectVectorIndex(Dataset dataset, String column) { + List indices; + try { + indices = dataset.describeIndices(new IndexCriteria.Builder().build()); + } catch (Exception e) { + LOG.warn("describeIndices failed, falling back to flat search: {}", e.getMessage()); + return Optional.empty(); + } + + Map fieldIdToName = new HashMap<>(); + for (LanceField field : dataset.getLanceSchema().fields()) { + fieldIdToName.put(field.getId(), field.getName()); + } + + for (IndexDescription idx : indices) { + if (!isVectorIndex(idx)) { + continue; + } + List fieldNames = new ArrayList<>(); + for (Integer fieldId : idx.getFieldIds()) { + String name = fieldIdToName.get(fieldId); + if (name != null) { + fieldNames.add(name); + } + } + if (!fieldNames.contains(column)) { + continue; + } + List segments = new ArrayList<>(); + for (Index segment : idx.getSegments()) { + UUID uuid = segment.uuid(); + Set fragmentIds = segment.fragments().map(HashSet::new).orElseGet(HashSet::new); + segments.add(new VectorIndexSegment(uuid, fragmentIds)); + } + return Optional.of(new VectorIndexInfo(idx.getName(), segments)); + } + return Optional.empty(); + } + + private static boolean isVectorIndex(IndexDescription idx) { + String type = idx.getIndexType(); + if (type == null) { + return false; + } + return VECTOR_INDEX_TYPES.contains(type.toUpperCase(Locale.ROOT)); + } + + private static LanceSearchQuery withResolvedVectorColumn( + LanceSearchQuery base, String resolvedColumn) { + if (base.getVectorColumn() != null && !base.getVectorColumn().isEmpty()) { + return base; + } + return LanceSearchQuery.builder(SearchType.VECTOR) + .tableId(base.getTableId()) + .namespaceImpl(base.getNamespaceImpl()) + .namespaceProperties(base.getNamespaceProperties()) + .outputColumns(base.getOutputColumns()) + .topK(base.getK()) + .offset(base.getOffset()) + .version(base.getVersion()) + .filter(base.getFilter()) + .withRowId(base.getWithRowId()) + .vector(base.getVector()) + .vectorColumn(resolvedColumn) + .distanceType(base.getDistanceType()) + .nprobes(base.getNprobes()) + .ef(base.getEf()) + .refineFactor(base.getRefineFactor()) + .lowerBound(base.getLowerBound()) + .upperBound(base.getUpperBound()) + .bypassVectorIndex(base.getBypassVectorIndex()) + .fastSearch(base.getFastSearch()) + .prefilter(base.getPrefilter()) + .build(); + } + + /** One planned execution unit: either a single-segment indexed scan, or a single-fragment KNN. */ + private static final class PlannedUnit { + final List fragmentIds; + final List indexSegments; + + private PlannedUnit(List fragmentIds, List indexSegments) { + this.fragmentIds = fragmentIds; + this.indexSegments = indexSegments; + } + + static PlannedUnit indexed(UUID segmentUuid) { + return new PlannedUnit(Collections.emptyList(), Collections.singletonList(segmentUuid)); + } + + static PlannedUnit fallback(Integer fragmentId) { + return new PlannedUnit(Collections.singletonList(fragmentId), Collections.emptyList()); + } + } + + /** Lightweight view of a vector index for planning. */ + private static final class VectorIndexInfo { + private final String indexName; + private final List segments; + + VectorIndexInfo(String indexName, List segments) { + this.indexName = indexName; + this.segments = Collections.unmodifiableList(new ArrayList<>(segments)); + } + + String getIndexName() { + return indexName; + } + + List getSegments() { + return segments; + } + } + + /** One physical segment of a vector index. */ + private static final class VectorIndexSegment { + private final UUID uuid; + private final Set fragmentIds; + + VectorIndexSegment(UUID uuid, Set fragmentIds) { + this.uuid = uuid; + this.fragmentIds = Collections.unmodifiableSet(new HashSet<>(fragmentIds)); + } + + UUID getUuid() { + return uuid; + } + + Set getFragmentIds() { + return fragmentIds; + } + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScanBuilder.java index f29260d83..a8ea6b0b9 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchScanBuilder.java @@ -13,21 +13,49 @@ */ package org.lance.spark.search; +import org.lance.spark.LanceSparkReadOptions; + import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.types.StructType; +import java.util.Map; + public class LanceSearchScanBuilder implements ScanBuilder { private final StructType schema; private final LanceSearchQuery query; + private final boolean distributed; + private final LanceSparkReadOptions readOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final Map initialStorageOptions; - public LanceSearchScanBuilder(StructType schema, LanceSearchQuery query) { + public LanceSearchScanBuilder( + StructType schema, + LanceSearchQuery query, + boolean distributed, + LanceSparkReadOptions readOptions, + String namespaceImpl, + Map namespaceProperties, + Map initialStorageOptions) { this.schema = schema; this.query = query; + this.distributed = distributed; + this.readOptions = readOptions; + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = namespaceProperties; + this.initialStorageOptions = initialStorageOptions; } @Override public Scan build() { - return new LanceSearchScan(schema, query); + return new LanceSearchScan( + schema, + query, + distributed, + readOptions, + namespaceImpl, + namespaceProperties, + initialStorageOptions); } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchTable.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchTable.java index c34a65165..36d71e447 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchTable.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/search/LanceSearchTable.java @@ -13,6 +13,8 @@ */ package org.lance.spark.search; +import org.lance.spark.LanceSparkReadOptions; + import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.read.ScanBuilder; @@ -20,22 +22,58 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.util.Collections; +import java.util.Map; +import java.util.Objects; import java.util.Set; public class LanceSearchTable implements SupportsRead { private final String name; private final StructType schema; private final LanceSearchQuery query; + private final boolean distributed; + private final LanceSparkReadOptions readOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final Map initialStorageOptions; - public LanceSearchTable(String name, StructType schema, LanceSearchQuery query) { + public LanceSearchTable( + String name, + StructType schema, + LanceSearchQuery query, + boolean distributed, + LanceSparkReadOptions readOptions, + String namespaceImpl, + Map namespaceProperties, + Map initialStorageOptions) { this.name = name; this.schema = schema; this.query = query; + this.distributed = distributed; + if (distributed) { + this.readOptions = Objects.requireNonNull(readOptions, "readOptions"); + this.namespaceImpl = Objects.requireNonNull(namespaceImpl, "namespaceImpl"); + this.namespaceProperties = + namespaceProperties == null ? Collections.emptyMap() : namespaceProperties; + this.initialStorageOptions = + initialStorageOptions == null ? Collections.emptyMap() : initialStorageOptions; + } else { + this.readOptions = null; + this.namespaceImpl = null; + this.namespaceProperties = Collections.emptyMap(); + this.initialStorageOptions = Collections.emptyMap(); + } } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return new LanceSearchScanBuilder(schema, query); + return new LanceSearchScanBuilder( + schema, + query, + distributed, + readOptions, + namespaceImpl, + namespaceProperties, + initialStorageOptions); } @Override diff --git a/lance-spark-base_2.12/src/main/scala/org/lance/spark/search/LanceSearchTableFunctions.scala b/lance-spark-base_2.12/src/main/scala/org/lance/spark/search/LanceSearchTableFunctions.scala index ce117c8eb..529df99d6 100644 --- a/lance-spark-base_2.12/src/main/scala/org/lance/spark/search/LanceSearchTableFunctions.scala +++ b/lance-spark-base_2.12/src/main/scala/org/lance/spark/search/LanceSearchTableFunctions.scala @@ -16,10 +16,12 @@ package org.lance.spark.search import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, ExpressionInfo, Literal, SortOrder} import org.apache.spark.sql.catalyst.expressions.{CreateArray, Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LocalLimit, LogicalPlan, Offset, Project, Sort} import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types._ import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.lance.spark.LanceDataset @@ -222,12 +224,74 @@ object LanceSearchTableFunctions { schema: StructType, query: LanceSearchQuery, resolved: ResolvedLanceTable): LogicalPlan = { - val table = new LanceSearchTable(functionName, schema, query) - DataSourceV2Relation.create( + val distributed = shouldUseDistributed(query) + + val table = + if (distributed) { + new LanceSearchTable( + functionName, + schema, + query, + /* distributed = */ true, + resolved.table.readOptions(), + resolved.table.getNamespaceImpl, + resolved.table.getNamespaceProperties, + resolved.table.getInitialStorageOptions) + } else { + new LanceSearchTable( + functionName, + schema, + query, + /* distributed = */ false, + null, + null, + null, + null) + } + + val rel = DataSourceV2Relation.create( table, Some(resolved.catalog), Some(resolved.identifier), CaseInsensitiveStringMap.empty()) + + if (distributed) { + val distanceAttr = + rel.output + .find(attr => attr.name == DistanceMetricColumn) + .getOrElse { + throw new IllegalStateException( + s"Internal column ${DistanceMetricColumn} is missing from vector_search plan.") + } + val sorted = Sort(Seq(SortOrder(distanceAttr, Ascending)), global = true, rel) + // Each worker over-fetches `k + offset` rows (LanceSearchQuery.k = userK + offset, set by + // LanceSearchTableFunctions.vectorSearch). Globally we sort, drop the first `offset` rows, + // then take `userK`. + val totalCandidates = query.getK + val effectiveOffset = + if (query.getOffset != null) query.getOffset.intValue() else 0 + val userK = math.max(totalCandidates - effectiveOffset, 0) + val candidateLimited = + GlobalLimit(Literal(totalCandidates), LocalLimit(Literal(totalCandidates), sorted)) + if (effectiveOffset > 0) { + GlobalLimit( + Literal(userK), + LocalLimit(Literal(userK), Offset(Literal(effectiveOffset), candidateLimited))) + } else { + candidateLimited + } + } else { + rel + } + } + + private def shouldUseDistributed(query: LanceSearchQuery): Boolean = { + val spark = SparkSession.active + val enabled = spark.conf.get("spark.sql.lance.search.distributed.enabled", "true").toBoolean + if (!enabled) return false + if (query.getSearchType != SearchType.VECTOR) return false + if (java.lang.Boolean.TRUE == query.getBypassVectorIndex) return false + true } private def hybridRelation( diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/search/BaseSparkDistributedVectorSearchTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/search/BaseSparkDistributedVectorSearchTest.java new file mode 100644 index 000000000..853ef599f --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/search/BaseSparkDistributedVectorSearchTest.java @@ -0,0 +1,195 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.search; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Integration tests for distributed VECTOR_SEARCH (the path through {@link LanceSearchTable} with + * {@code distributed=true}). + * + *

These tests cover the fallback-only path: with no vector index built on the table, {@link + * LanceSearchScan} emits one fallback partition per fragment and each Spark task runs flat KNN. + * Indexed-unit behavior (IVF_PQ etc.) is intentionally not exercised here yet — see the + * {@code @Disabled} placeholder at the bottom of this class. + */ +public abstract class BaseSparkDistributedVectorSearchTest { + private static final String CATALOG_NAME = "lance_dist_search"; + private SparkSession spark; + + @TempDir Path tempDir; + + @BeforeEach + void setup() { + spark = + SparkSession.builder() + .appName("lance-distributed-vector-search-test") + .master("local[2]") + .config( + "spark.sql.catalog." + CATALOG_NAME, "org.lance.spark.LanceNamespaceSparkCatalog") + .config( + "spark.sql.extensions", "org.lance.spark.extensions.LanceSparkSessionExtensions") + .config("spark.sql.catalog." + CATALOG_NAME + ".impl", "dir") + .config("spark.sql.catalog." + CATALOG_NAME + ".root", tempDir.toString()) + .getOrCreate(); + spark.sql("CREATE NAMESPACE " + CATALOG_NAME + ".default"); + } + + @AfterEach + void tearDown() throws IOException { + if (spark != null) { + spark.close(); + } + } + + /** + * Build a 5-fragment table with no vector index. The planner emits five fallback units, one per + * fragment. Each fragment has a single row; vectors are spaced so the closest to (0,0,0,0) is id + * 0, then 1, then 2, etc. + */ + private String createFiveFragmentTable() { + String fullName = CATALOG_NAME + ".default.dist_vec"; + spark.sql( + "CREATE TABLE " + + fullName + + " (id INT NOT NULL, vector ARRAY NOT NULL) USING lance " + + "TBLPROPERTIES ('vector.arrow.fixed-size-list.size' = '4')"); + spark.sql("INSERT INTO " + fullName + " VALUES (0, array(0.0, 0.0, 0.0, 0.0))"); + spark.sql("INSERT INTO " + fullName + " VALUES (1, array(1.0, 1.0, 1.0, 1.0))"); + spark.sql("INSERT INTO " + fullName + " VALUES (2, array(2.0, 2.0, 2.0, 2.0))"); + spark.sql("INSERT INTO " + fullName + " VALUES (3, array(3.0, 3.0, 3.0, 3.0))"); + spark.sql("INSERT INTO " + fullName + " VALUES (4, array(4.0, 4.0, 4.0, 4.0))"); + return fullName; + } + + @Test + void distributedAndSinglePartitionAgreeOnTopK() { + String fullName = createFiveFragmentTable(); + String sql = + "SELECT id, _distance FROM VECTOR_SEARCH('" + + fullName + + "', array(0.0, 0.0, 0.0, 0.0), 5) ORDER BY _distance, id"; + + spark.conf().set("spark.sql.lance.search.distributed.enabled", "false"); + List single = new ArrayList<>(spark.sql(sql).collectAsList()); + + spark.conf().set("spark.sql.lance.search.distributed.enabled", "true"); + List distributed = new ArrayList<>(spark.sql(sql).collectAsList()); + + Comparator byIdThenDist = + Comparator.comparing(r -> r.getInt(0)) + .thenComparingDouble(r -> (double) r.getFloat(1)); + single.sort(byIdThenDist); + distributed.sort(byIdThenDist); + + assertEquals(single.size(), distributed.size(), "result row count must match"); + for (int i = 0; i < single.size(); i++) { + assertEquals( + single.get(i).getInt(0), distributed.get(i).getInt(0), "row " + i + " id mismatch"); + assertEquals( + single.get(i).getFloat(1), + distributed.get(i).getFloat(1), + 1e-3f, + "row " + i + " _distance mismatch"); + } + } + + @Test + void distributedReturnsExactlyKRows() { + String fullName = createFiveFragmentTable(); + spark.conf().set("spark.sql.lance.search.distributed.enabled", "true"); + List rows = + spark + .sql("SELECT id FROM VECTOR_SEARCH('" + fullName + "', array(0.0, 0.0, 0.0, 0.0), 3)") + .collectAsList(); + assertEquals(3, rows.size()); + } + + @Test + void distributedKLargerThanRowsReturnsAllRows() { + String fullName = createFiveFragmentTable(); + spark.conf().set("spark.sql.lance.search.distributed.enabled", "true"); + List rows = + spark + .sql("SELECT id FROM VECTOR_SEARCH('" + fullName + "', array(0.0, 0.0, 0.0, 0.0), 100)") + .collectAsList(); + assertEquals(5, rows.size()); + } + + @Test + void distributedEmptyTableReturnsZeroRows() { + String fullName = CATALOG_NAME + ".default.empty_vec"; + spark.sql( + "CREATE TABLE " + + fullName + + " (id INT NOT NULL, vector ARRAY NOT NULL) USING lance " + + "TBLPROPERTIES ('vector.arrow.fixed-size-list.size' = '4')"); + spark.conf().set("spark.sql.lance.search.distributed.enabled", "true"); + List rows = + spark + .sql("SELECT id FROM VECTOR_SEARCH('" + fullName + "', array(0.0, 0.0, 0.0, 0.0), 5)") + .collectAsList(); + assertEquals(0, rows.size()); + } + + @Test + void distributedNearestRowIsClosest() { + String fullName = createFiveFragmentTable(); + spark.conf().set("spark.sql.lance.search.distributed.enabled", "true"); + List rows = + spark + .sql( + "SELECT id, _distance FROM VECTOR_SEARCH('" + + fullName + + "', array(0.0, 0.0, 0.0, 0.0), 1)") + .collectAsList(); + assertEquals(1, rows.size()); + assertEquals(0, rows.get(0).getInt(0), "closest id to origin should be 0"); + assertEquals(0.0f, rows.get(0).getFloat(1), 1e-4f); + } + + /** + * TODO: end-to-end coverage for indexed-unit path with IVF_PQ. + * + *

lance-spark does not yet support {@code CREATE INDEX ... USING IVF_PQ} via SQL; once that + * DDL is wired through, this test should: + * + *

    + *
  1. Build a table large enough for IVF_PQ training (>= 256 rows). + *
  2. {@code CREATE INDEX ... USING IVF_PQ} on the vector column. + *
  3. Enable {@code spark.sql.lance.search.distributed.enabled=true}. + *
  4. Run VECTOR_SEARCH and assert {@link LanceSearchScan} planned indexed units (one per + * segment) and the merged top-k matches the non-distributed reference run. + *
+ */ + @Test + @Disabled("TODO: pending CREATE INDEX ... USING IVF_PQ support in lance-spark SQL extensions") + void distributedIndexedUnitWithIvfPqIndex() { + // Intentionally empty until IVF_PQ DDL is available. + } +}