From 4b84c85ee3324667b094ad0ad1dd7cc8a0fe9a54 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 13 May 2026 09:44:29 +0200 Subject: [PATCH] Add async-IO pipeline on the layer-0 FusedPQ search path When the GraphSearcher iterates the candidate heap on level 0 with a FusedPQ score function, each visited node requires a contiguous read of the per-node block (PQ codes + degree + neighbor ids). With a remote-storage RandomAccessReader, that read dominates wall time: profiling a workload running on herddb's RemoteRandomAccessReader shows 53% of FJ-worker wall time blocked in RemoteRandomAccessReader.readFully serving FusedPQ$PackedNeighbors.readInto. The underlying network client supports concurrent reads but the jvector API exposes only synchronous reads, so the IO is forced to serialize. This change introduces a non-blocking range read on RandomAccessReader and uses it from GraphSearcher to overlap the next visited node's IO with the current node's similarity compute: * RandomAccessReader.readRangeAsync(long, int): default sync fallback (position-preserving) so local / mmap readers are unaffected; network-backed readers can override to dispatch a true async read. * OnDiskGraphIndex.View.readPackedNeighborsAsync(int): reads the contiguous per-node block in one shot and parses it into a self-contained PackedNeighborData (codes + neighbors[] + degree), so two reads can be in flight without sharing scratch buffers. * ScoreFunction.enableSimilarityToNeighbors(int, ByteSequence) + FusedPQDecoder override: consume already-loaded codes, bypassing the implicit disk read inside the decoder. * GraphSearcher.searchOneLayer dispatches to a new searchOneLayerAsync when conditions match (level 0, FusedPQ view, score-fn supports neighbor-batch) and the new setAsyncPipelineEnabled(true) is set. The async loop is a 2-slot pipeline: while waiting on the current node's read, it peeks the candidate heap and starts a speculative read for the next likely-visited node. On peek hits, that future is consumed for free; on peek misses, the bytes still land in the reader's block cache (if any) for later reuse. Node visit order and pop scoring are unchanged, so search results are bit-equivalent to the sync path. A new TestAsyncPipelineSearch checks (a) the default readRangeAsync returns the same bytes as a sync seek+readFully and preserves the reader position, and (b) sync vs pipeline search produce identical node ids and scores on a deterministic FusedPQ graph. Opt-in via GraphSearcher.setAsyncPipelineEnabled(true); intended for callers that supply a RandomAccessReader with a non-blocking readRangeAsync override. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../jvector/disk/RandomAccessReader.java | 40 +++++ .../jbellis/jvector/graph/GraphSearcher.java | 141 +++++++++++++++ .../jvector/graph/disk/OnDiskGraphIndex.java | 72 ++++++++ .../graph/similarity/ScoreFunction.java | 13 ++ .../jvector/quantization/FusedPQDecoder.java | 8 + .../graph/TestAsyncPipelineSearch.java | 167 ++++++++++++++++++ 6 files changed, 441 insertions(+) create mode 100644 jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java b/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java index 09e751e1a..4d3497617 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** * This is a subset of DataInput, plus seek and readFully methods, which allows implementations @@ -127,4 +128,43 @@ default void readFully(float[] floats) throws IOException { * @throws IOException if an I/O error occurs */ long length() throws IOException; + + /** + * Asynchronously read {@code length} bytes starting at {@code offset}. The returned future + * completes with a ByteBuffer positioned at 0 and limited to {@code length}. + * + *

Contract: + *

+ * + *

The default implementation is a synchronous fallback that saves and restores the current + * position around a {@code seek}/{@code readFully} pair, returning a completed future. This + * preserves correctness for local file/mmap readers without giving them any actual concurrency + * benefit; readers that wrap a non-blocking backend should override. + * + * @param offset starting offset + * @param length number of bytes to read + * @return a future completing with the read bytes + */ + default CompletableFuture readRangeAsync(long offset, int length) { + try { + long saved = getPosition(); + byte[] bytes = new byte[length]; + seek(offset); + readFully(bytes); + seek(saved); + return CompletableFuture.completedFuture(ByteBuffer.wrap(bytes)); + } catch (IOException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(e); + return failed; + } + } } diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/GraphSearcher.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/GraphSearcher.java index c75c6b163..0aee891df 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/GraphSearcher.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/GraphSearcher.java @@ -26,6 +26,7 @@ import io.github.jbellis.jvector.annotations.Experimental; import io.github.jbellis.jvector.graph.ImmutableGraphIndex.NodeAtLevel; +import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider; import io.github.jbellis.jvector.graph.similarity.ScoreFunction; import io.github.jbellis.jvector.graph.similarity.SearchScoreProvider; @@ -42,6 +43,7 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** @@ -65,6 +67,7 @@ public class GraphSearcher implements Closeable { private CachingReranker cachingReranker; private boolean pruneSearch; + private boolean asyncPipelineEnabled; private final ScoreTracker.ScoreTrackerFactory scoreTrackerFactory; private int visitedCount; @@ -128,6 +131,27 @@ public void usePruning(boolean usage) { pruneSearch = usage; } + /** + * Enable a 2-slot IO pipeline on the layer-0 FusedPQ search path. When enabled (and the + * conditions are met — graph view is an {@link OnDiskGraphIndex.View} with a FUSED_PQ feature, + * and the score function supports neighbor-batch similarity), {@link #searchOneLayer} kicks + * off an asynchronous read for the next likely-visited node before waiting on the current + * node's read. With a {@link io.github.jbellis.jvector.disk.RandomAccessReader} implementation + * that overrides {@code readRangeAsync} to issue truly non-blocking IO (e.g. a network-backed + * reader), this overlaps remote IO with similarity compute and reduces search wall time. + * + *

With the default synchronous fallback for {@code readRangeAsync}, this mode degrades to + * serialized reads with a small per-node {@link CompletableFuture} overhead and per-call + * buffer allocation — no concurrency benefit. Leave this disabled for in-memory / mmap + * readers; enable it for readers backed by a parallel-capable network client. + * + *

Disabled by default. + */ + @Experimental + public void setAsyncPipelineEnabled(boolean enabled) { + this.asyncPipelineEnabled = enabled; + } + /** * Convenience function for simple one-off searches. It is caller's responsibility to make sure that it * is the unique owner of the vectors instance passed in here. @@ -413,6 +437,17 @@ public void searchOneLayer(SearchScoreProvider scoreProvider, int level, Bits acceptOrdsThisLayer) { + // Async-IO pipeline path: only when explicitly enabled and the graph/score-function shape + // matches. Falls back to the original synchronous loop otherwise. + if (asyncPipelineEnabled + && level == 0 + && scoreProvider.scoreFunction().supportsSimilarityToNeighbors() + && view instanceof OnDiskGraphIndex.View + && ((OnDiskGraphIndex.View) view).hasFusedPQ()) { + searchOneLayerAsync(scoreProvider, rerankK, threshold, acceptOrdsThisLayer); + return; + } + try { assert approximateResults.size() == 0; // should be cleared by setEntryPointsFromPreviousLayer approximateResults.setMaxSize(rerankK); @@ -459,6 +494,112 @@ public void searchOneLayer(SearchScoreProvider scoreProvider, } } + /** + * 2-slot pipelined variant of {@link #searchOneLayer} for the layer-0 FusedPQ path. While + * waiting on the read for the current frontier node, kicks off an asynchronous read for the + * next likely-visited node (the heap's top). When the speculative peek matches the heap's + * top after the current node's neighbors are scored and pushed, the prefetched future is + * consumed for free; when it doesn't, the future's bytes still land in the underlying + * reader's block cache (if any) and the search continues correctly. + * + *

Semantics relative to {@link #searchOneLayer}: same nodes are visited in the same order + * (popping is deterministic given the heap state), so search results are bit-equivalent. Only + * IO scheduling differs. + */ + private void searchOneLayerAsync(SearchScoreProvider scoreProvider, + int rerankK, + float threshold, + Bits acceptOrdsThisLayer) { + try { + assert approximateResults.size() == 0; + approximateResults.setMaxSize(rerankK); + + var scoreTracker = scoreTrackerFactory.getScoreTracker(pruneSearch, rerankK, threshold); + OnDiskGraphIndex.View odView = (OnDiskGraphIndex.View) view; + var scoreFunction = scoreProvider.scoreFunction(); + + if (candidates.size() == 0 || stopSearch(candidates, scoreTracker, rerankK, threshold)) { + return; + } + + float currentScore = candidates.topScore(); + int currentNode = candidates.pop(); + CompletableFuture currentRead = + odView.readPackedNeighborsAsync(currentNode); + + while (currentRead != null) { + // Speculatively start the next read BEFORE waiting on the current one. + int peekedNode = -1; + float peekedScore = 0f; + CompletableFuture nextRead = null; + if (candidates.size() > 0) { + peekedScore = candidates.topScore(); + peekedNode = candidates.topNode(); + nextRead = odView.readPackedNeighborsAsync(peekedNode); + } + + // Park on the current read; the speculative next read overlaps with what follows. + OnDiskGraphIndex.PackedNeighborData data = currentRead.join(); + + // Result/threshold bookkeeping — identical to the sync path. + if (acceptOrdsThisLayer.get(currentNode) && currentScore >= threshold) { + addTopCandidate(currentNode, currentScore, rerankK); + } + + // Skip neighbor expansion if shouldStop says we're in drain mode AND we still + // have enough queued candidates to fill the result set. Same condition as the + // sync path's `continue`. + boolean skipExpand = scoreTracker.shouldStop() + && candidates.size() >= rerankK - approximateResults.size(); + + if (!skipExpand) { + expandedCountBaseLayer++; + expandedCount++; + + scoreFunction.enableSimilarityToNeighbors(currentNode, data.codes); + for (int i = 0; i < data.degree; i++) { + int friend = data.neighbors[i]; + if (visited.add(friend)) { + float friendSim = scoreFunction.similarityToNeighbor(currentNode, i); + scoreTracker.track(friendSim); + candidates.push(friend, friendSim); + visitedCount++; + } + } + } + + // Stop check now that the queue reflects this iteration's pushes. + if (candidates.size() == 0 + || stopSearch(candidates, scoreTracker, rerankK, threshold)) { + // nextRead (if any) is left to land in the block cache; no leak (on-heap buffers). + break; + } + + // Advance the pipeline. + if (nextRead == null) { + currentScore = candidates.topScore(); + currentNode = candidates.pop(); + currentRead = odView.readPackedNeighborsAsync(currentNode); + } else if (candidates.topNode() == peekedNode) { + candidates.pop(); + currentScore = peekedScore; + currentNode = peekedNode; + currentRead = nextRead; + } else { + // Compute step pushed a candidate that displaced peekedNode from the top. + // Drop the speculative result locally; bytes are in the block cache so the + // eventual visit to peekedNode will hit there. + currentScore = candidates.topScore(); + currentNode = candidates.pop(); + currentRead = odView.readPackedNeighborsAsync(currentNode); + } + } + } catch (Throwable t) { + approximateResults.clear(); + throw t; + } + } + private void searchLayer0(int topK, int rerankK, float threshold) { // rR is persistent to save on allocations rerankedResults.clear(); diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndex.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndex.java index 9ab122392..4270225c9 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndex.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndex.java @@ -38,6 +38,7 @@ import io.github.jbellis.jvector.util.RamUsageEstimator; import io.github.jbellis.jvector.vector.VectorSimilarityFunction; import io.github.jbellis.jvector.vector.VectorizationProvider; +import io.github.jbellis.jvector.vector.types.ByteSequence; import io.github.jbellis.jvector.vector.types.VectorFloat; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -465,6 +467,25 @@ public double getAverageDegree(int level) { return (double) sum / it.size(); } + /** + * Parsed result of {@link View#readPackedNeighborsAsync(int)}. Self-contained — independent of + * the View's mutable scratch buffers — so multiple instances can coexist when the search loop + * is pipelining reads. + */ + public static final class PackedNeighborData { + public final int node; + public final ByteSequence codes; + public final int[] neighbors; + public final int degree; + + PackedNeighborData(int node, ByteSequence codes, int[] neighbors, int degree) { + this.node = node; + this.codes = codes; + this.neighbors = neighbors; + this.degree = degree; + } + } + public class View implements FeatureSource, ScoringView, RandomAccessVectorValues { protected final RandomAccessReader reader; private final int[] neighbors; @@ -614,6 +635,57 @@ public Int2ObjectHashMap getInlineSourceFeatures() { } } + /** + * @return true if this graph carries a FUSED_PQ feature on layer 0 (i.e. the pipelined + * async search path is applicable). + */ + public boolean hasFusedPQ() { + Feature f = features.get(FeatureId.FUSED_PQ); + return f != null && f.isFused(); + } + + /** + * Asynchronously read the entire layer-0 block for {@code node} (PQ codes + degree + + * neighbor ids) in a single contiguous range. Used by GraphSearcher's pipelined search + * path to overlap remote IO with similarity compute. + * + *

The returned {@link PackedNeighborData} is self-contained — it does not share state + * with the View's mutable scratch fields, so two reads can be in flight concurrently. + * + * @throws UnsupportedOperationException if the graph has no FUSED_PQ feature + */ + public CompletableFuture readPackedNeighborsAsync(int node) { + Feature feature = features.get(FeatureId.FUSED_PQ); + if (feature == null || !feature.isFused()) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new UnsupportedOperationException("readPackedNeighborsAsync requires a FusedPQ graph")); + return failed; + } + long start = baseNodeOffsetFor(node); + int maxDegree = layerInfo.get(0).degree; + int totalLen = Integer.BYTES + inlineBlockSize + Integer.BYTES * (maxDegree + 1); + int codesOffset = Integer.BYTES + inlineOffsets.get(FeatureId.FUSED_PQ); + int codesLength = feature.featureSize(); + int degreePos = Integer.BYTES + inlineBlockSize; + + return reader.readRangeAsync(start, totalLen).thenApply(buf -> { + int actualDegree = buf.getInt(degreePos); + assert actualDegree <= maxDegree + : String.format("Node %d neighborCount %d > M %d", node, actualDegree, maxDegree); + int[] nbrs = new int[actualDegree]; + int idsBase = degreePos + Integer.BYTES; + for (int i = 0; i < actualDegree; i++) { + nbrs[i] = buf.getInt(idsBase + Integer.BYTES * i); + } + ByteSequence codes = vectorTypeSupport.createByteSequence(codesLength); + for (int i = 0; i < codesLength; i++) { + codes.set(i, buf.get(codesOffset + i)); + } + return new PackedNeighborData(node, codes, nbrs, actualDegree); + }); + } + @Override public void processNeighbors(int level, int node, ScoreFunction scoreFunction, IntMarker visited, NeighborProcessor neighborProcessor) { var useEdgeLoading = scoreFunction.supportsSimilarityToNeighbors(); diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/similarity/ScoreFunction.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/similarity/ScoreFunction.java index 9ae58fab1..69b98c042 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/similarity/ScoreFunction.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/similarity/ScoreFunction.java @@ -17,6 +17,7 @@ package io.github.jbellis.jvector.graph.similarity; import io.github.jbellis.jvector.vector.VectorizationProvider; +import io.github.jbellis.jvector.vector.types.ByteSequence; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; /** @@ -58,6 +59,18 @@ default float similarityToNeighbor(int origin, int neighborIndex) { */ default void enableSimilarityToNeighbors(int origin) {} + /** + * Variant of {@link #enableSimilarityToNeighbors(int)} that consumes a caller-supplied buffer of + * already-loaded neighbor codes. Used by the pipelined search path that pre-fetches the codes + * asynchronously while computing similarities for the previous origin. + *

+ * Default: ignore the preloaded buffer and fall back to {@link #enableSimilarityToNeighbors(int)}, + * which lets implementations that don't benefit from preloading remain unchanged. + */ + default void enableSimilarityToNeighbors(int origin, ByteSequence preloadedCodes) { + enableSimilarityToNeighbors(origin); + } + /** * @return true if `similarityToNeighbor` is supported */ diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java index b52d0ca66..04050fb35 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java @@ -89,6 +89,14 @@ public void enableSimilarityToNeighbors(int origin) { } } + @Override + public void enableSimilarityToNeighbors(int origin, ByteSequence preloadedCodes) { + if (this.origin != origin) { + this.origin = origin; + this.neighborCodes.copyFrom(preloadedCodes, 0, 0, preloadedCodes.length()); + } + } + @Override public float similarityTo(int node2) { if (!hierarchyCachedFeatures.containsKey(node2)) { diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java new file mode 100644 index 000000000..2a1c6c49b --- /dev/null +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java @@ -0,0 +1,167 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.github.jbellis.jvector.graph; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import io.github.jbellis.jvector.TestUtil; +import io.github.jbellis.jvector.disk.SimpleMappedReader; +import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; +import io.github.jbellis.jvector.graph.disk.feature.FeatureId; +import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider; +import io.github.jbellis.jvector.graph.similarity.ScoreFunction; +import io.github.jbellis.jvector.graph.similarity.SearchScoreProvider; +import io.github.jbellis.jvector.quantization.PQVectors; +import io.github.jbellis.jvector.quantization.ProductQuantization; +import io.github.jbellis.jvector.util.Bits; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import io.github.jbellis.jvector.vector.types.VectorFloat; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Random; + +import static io.github.jbellis.jvector.graph.TestVectorGraph.createRandomFloatVectors; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Validates the async-IO pipeline added on the layer-0 FusedPQ search path. + *

+ * Two things to check: + *

    + *
  1. The default {@link RandomAccessReader#readRangeAsync(long, int)} fallback returns the same + * bytes as a {@code seek}+{@code readFully} pair and does not change the reader's position.
  2. + *
  3. With pipeline enabled, search returns the same node ids and scores as the sync path on a + * deterministic FusedPQ graph (bit-equivalence — IO scheduling change must not affect the + * traversal).
  4. + *
+ */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class TestAsyncPipelineSearch extends RandomizedTest { + + private Path testDirectory; + private Random random; + + @Before + public void setup() throws IOException { + testDirectory = Files.createTempDirectory(this.getClass().getSimpleName()); + random = getRandom(); + } + + @After + public void tearDown() { + TestUtil.deleteQuietly(testDirectory); + } + + @Test + public void testDefaultReadRangeAsyncMatchesSync() throws IOException { + Path file = testDirectory.resolve("bytes.bin"); + byte[] expected = new byte[1024]; + random.nextBytes(expected); + Files.write(file, expected); + + try (var supplier = new SimpleMappedReader.Supplier(file); + var reader = supplier.get()) { + reader.seek(7); + // capture position before + long beforePos = reader.getPosition(); + ByteBuffer got = reader.readRangeAsync(100, 200).join(); + // position must be unchanged + assertEquals(beforePos, reader.getPosition()); + // bytes must match + byte[] actual = new byte[200]; + got.get(actual); + assertArrayEquals(Arrays.copyOfRange(expected, 100, 300), actual); + } + } + + @Test + public void testAsyncPipelineMatchesSyncOnFusedPQ() throws IOException { + int size = 500; + int dim = 32; + var vectors = MockVectorValues.fromValues(createRandomFloatVectors(size, dim, random)); + + var simFn = VectorSimilarityFunction.EUCLIDEAN; + int topK = 10; + int rerankK = 40; + + var builder = new GraphIndexBuilder(vectors, simFn, 32, 32, 1.2f, 1.2f, false); + var tempGraph = builder.build(vectors); + var pq = ProductQuantization.compute(vectors, 8, 256, false); + var pqv = (PQVectors) pq.encodeAll(vectors); + + var outputPath = testDirectory.resolve("graph_fpq"); + TestUtil.writeFusedGraph(tempGraph, vectors, pqv, FeatureId.INLINE_VECTORS, outputPath); + + try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath); + var graph = OnDiskGraphIndex.load(readerSupplier, 0)) { + + for (int q = 0; q < 25; q++) { + VectorFloat query = TestUtil.randomVector(random, dim); + + // sync baseline + int[] syncNodes; + float[] syncScores; + try (var searcher = new GraphSearcher(graph)) { + var ssp = fusedScoreProvider(searcher.getView(), query, simFn); + var r = searcher.search(ssp, topK, rerankK, 0f, 0f, Bits.ALL); + syncNodes = nodeIds(r); + syncScores = nodeScores(r); + } + + // async pipeline + int[] asyncNodes; + float[] asyncScores; + try (var searcher = new GraphSearcher(graph)) { + searcher.setAsyncPipelineEnabled(true); + var ssp = fusedScoreProvider(searcher.getView(), query, simFn); + var r = searcher.search(ssp, topK, rerankK, 0f, 0f, Bits.ALL); + asyncNodes = nodeIds(r); + asyncScores = nodeScores(r); + } + + // bit-equivalent: same node ids in the same order, same scores. + assertArrayEquals("query " + q + " nodes differ", syncNodes, asyncNodes); + for (int i = 0; i < syncScores.length; i++) { + assertEquals("query " + q + " score[" + i + "]", + syncScores[i], asyncScores[i], 1e-6f); + } + } + } + } + + private static SearchScoreProvider fusedScoreProvider(ImmutableGraphIndex.View view, + VectorFloat query, + VectorSimilarityFunction simFn) { + var scoringView = (ImmutableGraphIndex.ScoringView) view; + ScoreFunction.ApproximateScoreFunction asf = + scoringView.approximateScoreFunctionFor(query, simFn); + var rr = scoringView.rerankerFor(query, simFn); + return new DefaultSearchScoreProvider(asf, rr); + } + + private static int[] nodeIds(SearchResult r) { + return Arrays.stream(r.getNodes()).mapToInt(ns -> ns.node).toArray(); + } + + private static float[] nodeScores(SearchResult r) { + SearchResult.NodeScore[] ns = r.getNodes(); + float[] out = new float[ns.length]; + for (int i = 0; i < ns.length; i++) out[i] = ns[i].score; + return out; + } +}