Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* This is a subset of DataInput, plus seek and readFully methods, which allows implementations
Expand Down Expand Up @@ -127,4 +128,43 @@ default void readFully(float[] floats) throws IOException {
* @throws IOException if an I/O error occurs
*/
long length() throws IOException;

/**
* Asynchronously read {@code length} bytes starting at {@code offset}. The returned future
* completes with a ByteBuffer positioned at 0 and limited to {@code length}.
*
* <p>Contract:
* <ul>
* <li>Must NOT modify this reader's current seek position.</li>
* <li>Must NOT block the caller waiting for IO to complete. Implementations backed by a
* parallel-capable backend (e.g. a network client) should dispatch the read and return
* the future immediately.</li>
* <li>Multiple async reads may be in flight concurrently. The async path is logically
* routed through a shared backend that bypasses this reader instance's position
* cursor.</li>
* </ul>
*
* <p>The default implementation is a synchronous fallback that saves and restores the current
* position around a {@code seek}/{@code readFully} pair, returning a completed future. This
* preserves correctness for local file/mmap readers without giving them any actual concurrency
* benefit; readers that wrap a non-blocking backend should override.
*
* @param offset starting offset
* @param length number of bytes to read
* @return a future completing with the read bytes
*/
default CompletableFuture<ByteBuffer> readRangeAsync(long offset, int length) {
try {
long saved = getPosition();
byte[] bytes = new byte[length];
seek(offset);
readFully(bytes);
seek(saved);
return CompletableFuture.completedFuture(ByteBuffer.wrap(bytes));
} catch (IOException e) {
CompletableFuture<ByteBuffer> failed = new CompletableFuture<>();
failed.completeExceptionally(e);
return failed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.github.jbellis.jvector.annotations.Experimental;
import io.github.jbellis.jvector.graph.ImmutableGraphIndex.NodeAtLevel;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider;
import io.github.jbellis.jvector.graph.similarity.ScoreFunction;
import io.github.jbellis.jvector.graph.similarity.SearchScoreProvider;
Expand All @@ -42,6 +43,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;


/**
Expand All @@ -65,6 +67,7 @@ public class GraphSearcher implements Closeable {
private CachingReranker cachingReranker;

private boolean pruneSearch;
private boolean asyncPipelineEnabled;
private final ScoreTracker.ScoreTrackerFactory scoreTrackerFactory;

private int visitedCount;
Expand Down Expand Up @@ -128,6 +131,27 @@ public void usePruning(boolean usage) {
pruneSearch = usage;
}

/**
* Enable a 2-slot IO pipeline on the layer-0 FusedPQ search path. When enabled (and the
* conditions are met — graph view is an {@link OnDiskGraphIndex.View} with a FUSED_PQ feature,
* and the score function supports neighbor-batch similarity), {@link #searchOneLayer} kicks
* off an asynchronous read for the next likely-visited node before waiting on the current
* node's read. With a {@link io.github.jbellis.jvector.disk.RandomAccessReader} implementation
* that overrides {@code readRangeAsync} to issue truly non-blocking IO (e.g. a network-backed
* reader), this overlaps remote IO with similarity compute and reduces search wall time.
*
* <p>With the default synchronous fallback for {@code readRangeAsync}, this mode degrades to
* serialized reads with a small per-node {@link CompletableFuture} overhead and per-call
* buffer allocation — no concurrency benefit. Leave this disabled for in-memory / mmap
* readers; enable it for readers backed by a parallel-capable network client.
*
* <p>Disabled by default.
*/
@Experimental
public void setAsyncPipelineEnabled(boolean enabled) {
this.asyncPipelineEnabled = enabled;
}

/**
* Convenience function for simple one-off searches. It is caller's responsibility to make sure that it
* is the unique owner of the vectors instance passed in here.
Expand Down Expand Up @@ -413,6 +437,17 @@ public void searchOneLayer(SearchScoreProvider scoreProvider,
int level,
Bits acceptOrdsThisLayer)
{
// Async-IO pipeline path: only when explicitly enabled and the graph/score-function shape
// matches. Falls back to the original synchronous loop otherwise.
if (asyncPipelineEnabled
&& level == 0
&& scoreProvider.scoreFunction().supportsSimilarityToNeighbors()
&& view instanceof OnDiskGraphIndex.View
&& ((OnDiskGraphIndex.View) view).hasFusedPQ()) {
searchOneLayerAsync(scoreProvider, rerankK, threshold, acceptOrdsThisLayer);
return;
}

try {
assert approximateResults.size() == 0; // should be cleared by setEntryPointsFromPreviousLayer
approximateResults.setMaxSize(rerankK);
Expand Down Expand Up @@ -459,6 +494,112 @@ public void searchOneLayer(SearchScoreProvider scoreProvider,
}
}

/**
* 2-slot pipelined variant of {@link #searchOneLayer} for the layer-0 FusedPQ path. While
* waiting on the read for the current frontier node, kicks off an asynchronous read for the
* next likely-visited node (the heap's top). When the speculative peek matches the heap's
* top after the current node's neighbors are scored and pushed, the prefetched future is
* consumed for free; when it doesn't, the future's bytes still land in the underlying
* reader's block cache (if any) and the search continues correctly.
*
* <p>Semantics relative to {@link #searchOneLayer}: same nodes are visited in the same order
* (popping is deterministic given the heap state), so search results are bit-equivalent. Only
* IO scheduling differs.
*/
private void searchOneLayerAsync(SearchScoreProvider scoreProvider,
int rerankK,
float threshold,
Bits acceptOrdsThisLayer) {
try {
assert approximateResults.size() == 0;
approximateResults.setMaxSize(rerankK);

var scoreTracker = scoreTrackerFactory.getScoreTracker(pruneSearch, rerankK, threshold);
OnDiskGraphIndex.View odView = (OnDiskGraphIndex.View) view;
var scoreFunction = scoreProvider.scoreFunction();

if (candidates.size() == 0 || stopSearch(candidates, scoreTracker, rerankK, threshold)) {
return;
}

float currentScore = candidates.topScore();
int currentNode = candidates.pop();
CompletableFuture<OnDiskGraphIndex.PackedNeighborData> currentRead =
odView.readPackedNeighborsAsync(currentNode);

while (currentRead != null) {
// Speculatively start the next read BEFORE waiting on the current one.
int peekedNode = -1;
float peekedScore = 0f;
CompletableFuture<OnDiskGraphIndex.PackedNeighborData> nextRead = null;
if (candidates.size() > 0) {
peekedScore = candidates.topScore();
peekedNode = candidates.topNode();
nextRead = odView.readPackedNeighborsAsync(peekedNode);
}

// Park on the current read; the speculative next read overlaps with what follows.
OnDiskGraphIndex.PackedNeighborData data = currentRead.join();

// Result/threshold bookkeeping — identical to the sync path.
if (acceptOrdsThisLayer.get(currentNode) && currentScore >= threshold) {
addTopCandidate(currentNode, currentScore, rerankK);
}

// Skip neighbor expansion if shouldStop says we're in drain mode AND we still
// have enough queued candidates to fill the result set. Same condition as the
// sync path's `continue`.
boolean skipExpand = scoreTracker.shouldStop()
&& candidates.size() >= rerankK - approximateResults.size();

if (!skipExpand) {
expandedCountBaseLayer++;
expandedCount++;

scoreFunction.enableSimilarityToNeighbors(currentNode, data.codes);
for (int i = 0; i < data.degree; i++) {
int friend = data.neighbors[i];
if (visited.add(friend)) {
float friendSim = scoreFunction.similarityToNeighbor(currentNode, i);
scoreTracker.track(friendSim);
candidates.push(friend, friendSim);
visitedCount++;
}
}
}

// Stop check now that the queue reflects this iteration's pushes.
if (candidates.size() == 0
|| stopSearch(candidates, scoreTracker, rerankK, threshold)) {
// nextRead (if any) is left to land in the block cache; no leak (on-heap buffers).
break;
}

// Advance the pipeline.
if (nextRead == null) {
currentScore = candidates.topScore();
currentNode = candidates.pop();
currentRead = odView.readPackedNeighborsAsync(currentNode);
} else if (candidates.topNode() == peekedNode) {
candidates.pop();
currentScore = peekedScore;
currentNode = peekedNode;
currentRead = nextRead;
} else {
// Compute step pushed a candidate that displaced peekedNode from the top.
// Drop the speculative result locally; bytes are in the block cache so the
// eventual visit to peekedNode will hit there.
currentScore = candidates.topScore();
currentNode = candidates.pop();
currentRead = odView.readPackedNeighborsAsync(currentNode);
}
}
} catch (Throwable t) {
approximateResults.clear();
throw t;
}
}

private void searchLayer0(int topK, int rerankK, float threshold) {
// rR is persistent to save on allocations
rerankedResults.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.github.jbellis.jvector.util.RamUsageEstimator;
import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
import io.github.jbellis.jvector.vector.VectorizationProvider;
import io.github.jbellis.jvector.vector.types.ByteSequence;
import io.github.jbellis.jvector.vector.types.VectorFloat;
import io.github.jbellis.jvector.vector.types.VectorTypeSupport;

Expand All @@ -48,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -465,6 +467,25 @@ public double getAverageDegree(int level) {
return (double) sum / it.size();
}

/**
* Parsed result of {@link View#readPackedNeighborsAsync(int)}. Self-contained — independent of
* the View's mutable scratch buffers — so multiple instances can coexist when the search loop
* is pipelining reads.
*/
public static final class PackedNeighborData {
public final int node;
public final ByteSequence<?> codes;
public final int[] neighbors;
public final int degree;

PackedNeighborData(int node, ByteSequence<?> codes, int[] neighbors, int degree) {
this.node = node;
this.codes = codes;
this.neighbors = neighbors;
this.degree = degree;
}
}

public class View implements FeatureSource, ScoringView, RandomAccessVectorValues {
protected final RandomAccessReader reader;
private final int[] neighbors;
Expand Down Expand Up @@ -614,6 +635,57 @@ public Int2ObjectHashMap<FusedFeature.InlineSource> getInlineSourceFeatures() {
}
}

/**
* @return true if this graph carries a FUSED_PQ feature on layer 0 (i.e. the pipelined
* async search path is applicable).
*/
public boolean hasFusedPQ() {
Feature f = features.get(FeatureId.FUSED_PQ);
return f != null && f.isFused();
}

/**
* Asynchronously read the entire layer-0 block for {@code node} (PQ codes + degree +
* neighbor ids) in a single contiguous range. Used by GraphSearcher's pipelined search
* path to overlap remote IO with similarity compute.
*
* <p>The returned {@link PackedNeighborData} is self-contained — it does not share state
* with the View's mutable scratch fields, so two reads can be in flight concurrently.
*
* @throws UnsupportedOperationException if the graph has no FUSED_PQ feature
*/
public CompletableFuture<PackedNeighborData> readPackedNeighborsAsync(int node) {
Feature feature = features.get(FeatureId.FUSED_PQ);
if (feature == null || !feature.isFused()) {
CompletableFuture<PackedNeighborData> failed = new CompletableFuture<>();
failed.completeExceptionally(
new UnsupportedOperationException("readPackedNeighborsAsync requires a FusedPQ graph"));
return failed;
}
long start = baseNodeOffsetFor(node);
int maxDegree = layerInfo.get(0).degree;
int totalLen = Integer.BYTES + inlineBlockSize + Integer.BYTES * (maxDegree + 1);
int codesOffset = Integer.BYTES + inlineOffsets.get(FeatureId.FUSED_PQ);
int codesLength = feature.featureSize();
int degreePos = Integer.BYTES + inlineBlockSize;

return reader.readRangeAsync(start, totalLen).thenApply(buf -> {
int actualDegree = buf.getInt(degreePos);
assert actualDegree <= maxDegree
: String.format("Node %d neighborCount %d > M %d", node, actualDegree, maxDegree);
int[] nbrs = new int[actualDegree];
int idsBase = degreePos + Integer.BYTES;
for (int i = 0; i < actualDegree; i++) {
nbrs[i] = buf.getInt(idsBase + Integer.BYTES * i);
}
ByteSequence<?> codes = vectorTypeSupport.createByteSequence(codesLength);
for (int i = 0; i < codesLength; i++) {
codes.set(i, buf.get(codesOffset + i));
}
return new PackedNeighborData(node, codes, nbrs, actualDegree);
});
}

@Override
public void processNeighbors(int level, int node, ScoreFunction scoreFunction, IntMarker visited, NeighborProcessor neighborProcessor) {
var useEdgeLoading = scoreFunction.supportsSimilarityToNeighbors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.github.jbellis.jvector.graph.similarity;

import io.github.jbellis.jvector.vector.VectorizationProvider;
import io.github.jbellis.jvector.vector.types.ByteSequence;
import io.github.jbellis.jvector.vector.types.VectorTypeSupport;

/**
Expand Down Expand Up @@ -58,6 +59,18 @@ default float similarityToNeighbor(int origin, int neighborIndex) {
*/
default void enableSimilarityToNeighbors(int origin) {}

/**
* Variant of {@link #enableSimilarityToNeighbors(int)} that consumes a caller-supplied buffer of
* already-loaded neighbor codes. Used by the pipelined search path that pre-fetches the codes
* asynchronously while computing similarities for the previous origin.
* <p>
* Default: ignore the preloaded buffer and fall back to {@link #enableSimilarityToNeighbors(int)},
* which lets implementations that don't benefit from preloading remain unchanged.
*/
default void enableSimilarityToNeighbors(int origin, ByteSequence<?> preloadedCodes) {
enableSimilarityToNeighbors(origin);
}

/**
* @return true if `similarityToNeighbor` is supported
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public void enableSimilarityToNeighbors(int origin) {
}
}

@Override
public void enableSimilarityToNeighbors(int origin, ByteSequence<?> preloadedCodes) {
if (this.origin != origin) {
this.origin = origin;
this.neighborCodes.copyFrom(preloadedCodes, 0, 0, preloadedCodes.length());
}
}

@Override
public float similarityTo(int node2) {
if (!hierarchyCachedFeatures.containsKey(node2)) {
Expand Down
Loading
Loading