diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java b/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java index 09e751e1a..4d3497617 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/disk/RandomAccessReader.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** * This is a subset of DataInput, plus seek and readFully methods, which allows implementations @@ -127,4 +128,43 @@ default void readFully(float[] floats) throws IOException { * @throws IOException if an I/O error occurs */ long length() throws IOException; + + /** + * Asynchronously read {@code length} bytes starting at {@code offset}. The returned future + * completes with a ByteBuffer positioned at 0 and limited to {@code length}. + * + *
Contract: + *
The default implementation is a synchronous fallback that saves and restores the current
+ * position around a {@code seek}/{@code readFully} pair, returning a completed future. This
+ * preserves correctness for local file/mmap readers without giving them any actual concurrency
+ * benefit; readers that wrap a non-blocking backend should override.
+ *
+ * @param offset starting offset
+ * @param length number of bytes to read
+ * @return a future completing with the read bytes
+ */
+ default CompletableFuture With the default synchronous fallback for {@code readRangeAsync}, this mode degrades to
+ * serialized reads with a small per-node {@link CompletableFuture} overhead and per-call
+ * buffer allocation — no concurrency benefit. Leave this disabled for in-memory / mmap
+ * readers; enable it for readers backed by a parallel-capable network client.
+ *
+ * Disabled by default.
+ */
+ @Experimental
+ public void setAsyncPipelineEnabled(boolean enabled) {
+ this.asyncPipelineEnabled = enabled;
+ }
+
/**
* Convenience function for simple one-off searches. It is caller's responsibility to make sure that it
* is the unique owner of the vectors instance passed in here.
@@ -413,6 +437,17 @@ public void searchOneLayer(SearchScoreProvider scoreProvider,
int level,
Bits acceptOrdsThisLayer)
{
+ // Async-IO pipeline path: only when explicitly enabled and the graph/score-function shape
+ // matches. Falls back to the original synchronous loop otherwise.
+ if (asyncPipelineEnabled
+ && level == 0
+ && scoreProvider.scoreFunction().supportsSimilarityToNeighbors()
+ && view instanceof OnDiskGraphIndex.View
+ && ((OnDiskGraphIndex.View) view).hasFusedPQ()) {
+ searchOneLayerAsync(scoreProvider, rerankK, threshold, acceptOrdsThisLayer);
+ return;
+ }
+
try {
assert approximateResults.size() == 0; // should be cleared by setEntryPointsFromPreviousLayer
approximateResults.setMaxSize(rerankK);
@@ -459,6 +494,112 @@ public void searchOneLayer(SearchScoreProvider scoreProvider,
}
}
+ /**
+ * 2-slot pipelined variant of {@link #searchOneLayer} for the layer-0 FusedPQ path. While
+ * waiting on the read for the current frontier node, kicks off an asynchronous read for the
+ * next likely-visited node (the heap's top). When the speculative peek matches the heap's
+ * top after the current node's neighbors are scored and pushed, the prefetched future is
+ * consumed for free; when it doesn't, the future's bytes still land in the underlying
+ * reader's block cache (if any) and the search continues correctly.
+ *
+ * Semantics relative to {@link #searchOneLayer}: same nodes are visited in the same order
+ * (popping is deterministic given the heap state), so search results are bit-equivalent. Only
+ * IO scheduling differs.
+ */
+ private void searchOneLayerAsync(SearchScoreProvider scoreProvider,
+ int rerankK,
+ float threshold,
+ Bits acceptOrdsThisLayer) {
+ try {
+ assert approximateResults.size() == 0;
+ approximateResults.setMaxSize(rerankK);
+
+ var scoreTracker = scoreTrackerFactory.getScoreTracker(pruneSearch, rerankK, threshold);
+ OnDiskGraphIndex.View odView = (OnDiskGraphIndex.View) view;
+ var scoreFunction = scoreProvider.scoreFunction();
+
+ if (candidates.size() == 0 || stopSearch(candidates, scoreTracker, rerankK, threshold)) {
+ return;
+ }
+
+ float currentScore = candidates.topScore();
+ int currentNode = candidates.pop();
+ CompletableFuture The returned {@link PackedNeighborData} is self-contained — it does not share state
+ * with the View's mutable scratch fields, so two reads can be in flight concurrently.
+ *
+ * @throws UnsupportedOperationException if the graph has no FUSED_PQ feature
+ */
+ public CompletableFuture
+ * Default: ignore the preloaded buffer and fall back to {@link #enableSimilarityToNeighbors(int)},
+ * which lets implementations that don't benefit from preloading remain unchanged.
+ */
+ default void enableSimilarityToNeighbors(int origin, ByteSequence> preloadedCodes) {
+ enableSimilarityToNeighbors(origin);
+ }
+
/**
* @return true if `similarityToNeighbor` is supported
*/
diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java
index b52d0ca66..04050fb35 100644
--- a/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java
+++ b/jvector-base/src/main/java/io/github/jbellis/jvector/quantization/FusedPQDecoder.java
@@ -89,6 +89,14 @@ public void enableSimilarityToNeighbors(int origin) {
}
}
+ @Override
+ public void enableSimilarityToNeighbors(int origin, ByteSequence> preloadedCodes) {
+ if (this.origin != origin) {
+ this.origin = origin;
+ this.neighborCodes.copyFrom(preloadedCodes, 0, 0, preloadedCodes.length());
+ }
+ }
+
@Override
public float similarityTo(int node2) {
if (!hierarchyCachedFeatures.containsKey(node2)) {
diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java
new file mode 100644
index 000000000..2a1c6c49b
--- /dev/null
+++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/TestAsyncPipelineSearch.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.github.jbellis.jvector.graph;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import io.github.jbellis.jvector.TestUtil;
+import io.github.jbellis.jvector.disk.SimpleMappedReader;
+import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
+import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
+import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider;
+import io.github.jbellis.jvector.graph.similarity.ScoreFunction;
+import io.github.jbellis.jvector.graph.similarity.SearchScoreProvider;
+import io.github.jbellis.jvector.quantization.PQVectors;
+import io.github.jbellis.jvector.quantization.ProductQuantization;
+import io.github.jbellis.jvector.util.Bits;
+import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
+import io.github.jbellis.jvector.vector.types.VectorFloat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Random;
+
+import static io.github.jbellis.jvector.graph.TestVectorGraph.createRandomFloatVectors;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Validates the async-IO pipeline added on the layer-0 FusedPQ search path.
+ *
+ * Two things to check:
+ *
+ *
+ */
+@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
+public class TestAsyncPipelineSearch extends RandomizedTest {
+
+ private Path testDirectory;
+ private Random random;
+
+ @Before
+ public void setup() throws IOException {
+ testDirectory = Files.createTempDirectory(this.getClass().getSimpleName());
+ random = getRandom();
+ }
+
+ @After
+ public void tearDown() {
+ TestUtil.deleteQuietly(testDirectory);
+ }
+
+ @Test
+ public void testDefaultReadRangeAsyncMatchesSync() throws IOException {
+ Path file = testDirectory.resolve("bytes.bin");
+ byte[] expected = new byte[1024];
+ random.nextBytes(expected);
+ Files.write(file, expected);
+
+ try (var supplier = new SimpleMappedReader.Supplier(file);
+ var reader = supplier.get()) {
+ reader.seek(7);
+ // capture position before
+ long beforePos = reader.getPosition();
+ ByteBuffer got = reader.readRangeAsync(100, 200).join();
+ // position must be unchanged
+ assertEquals(beforePos, reader.getPosition());
+ // bytes must match
+ byte[] actual = new byte[200];
+ got.get(actual);
+ assertArrayEquals(Arrays.copyOfRange(expected, 100, 300), actual);
+ }
+ }
+
+ @Test
+ public void testAsyncPipelineMatchesSyncOnFusedPQ() throws IOException {
+ int size = 500;
+ int dim = 32;
+ var vectors = MockVectorValues.fromValues(createRandomFloatVectors(size, dim, random));
+
+ var simFn = VectorSimilarityFunction.EUCLIDEAN;
+ int topK = 10;
+ int rerankK = 40;
+
+ var builder = new GraphIndexBuilder(vectors, simFn, 32, 32, 1.2f, 1.2f, false);
+ var tempGraph = builder.build(vectors);
+ var pq = ProductQuantization.compute(vectors, 8, 256, false);
+ var pqv = (PQVectors) pq.encodeAll(vectors);
+
+ var outputPath = testDirectory.resolve("graph_fpq");
+ TestUtil.writeFusedGraph(tempGraph, vectors, pqv, FeatureId.INLINE_VECTORS, outputPath);
+
+ try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath);
+ var graph = OnDiskGraphIndex.load(readerSupplier, 0)) {
+
+ for (int q = 0; q < 25; q++) {
+ VectorFloat> query = TestUtil.randomVector(random, dim);
+
+ // sync baseline
+ int[] syncNodes;
+ float[] syncScores;
+ try (var searcher = new GraphSearcher(graph)) {
+ var ssp = fusedScoreProvider(searcher.getView(), query, simFn);
+ var r = searcher.search(ssp, topK, rerankK, 0f, 0f, Bits.ALL);
+ syncNodes = nodeIds(r);
+ syncScores = nodeScores(r);
+ }
+
+ // async pipeline
+ int[] asyncNodes;
+ float[] asyncScores;
+ try (var searcher = new GraphSearcher(graph)) {
+ searcher.setAsyncPipelineEnabled(true);
+ var ssp = fusedScoreProvider(searcher.getView(), query, simFn);
+ var r = searcher.search(ssp, topK, rerankK, 0f, 0f, Bits.ALL);
+ asyncNodes = nodeIds(r);
+ asyncScores = nodeScores(r);
+ }
+
+ // bit-equivalent: same node ids in the same order, same scores.
+ assertArrayEquals("query " + q + " nodes differ", syncNodes, asyncNodes);
+ for (int i = 0; i < syncScores.length; i++) {
+ assertEquals("query " + q + " score[" + i + "]",
+ syncScores[i], asyncScores[i], 1e-6f);
+ }
+ }
+ }
+ }
+
+ private static SearchScoreProvider fusedScoreProvider(ImmutableGraphIndex.View view,
+ VectorFloat> query,
+ VectorSimilarityFunction simFn) {
+ var scoringView = (ImmutableGraphIndex.ScoringView) view;
+ ScoreFunction.ApproximateScoreFunction asf =
+ scoringView.approximateScoreFunctionFor(query, simFn);
+ var rr = scoringView.rerankerFor(query, simFn);
+ return new DefaultSearchScoreProvider(asf, rr);
+ }
+
+ private static int[] nodeIds(SearchResult r) {
+ return Arrays.stream(r.getNodes()).mapToInt(ns -> ns.node).toArray();
+ }
+
+ private static float[] nodeScores(SearchResult r) {
+ SearchResult.NodeScore[] ns = r.getNodes();
+ float[] out = new float[ns.length];
+ for (int i = 0; i < ns.length; i++) out[i] = ns[i].score;
+ return out;
+ }
+}