diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java new file mode 100644 index 000000000..5a6fb9f1a --- /dev/null +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java @@ -0,0 +1,50 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.jbellis.jvector.graph.disk; + +/** + * Callback interface for monitoring the I/O progress of an + * {@link OnDiskGraphIndexCompactor} run. + * + *

The compactor processes nodes in batches, writing each batch to disk as it + * completes. {@link #onProgress} is called every ten batches so that the caller + * can update a live status display or health-check endpoint without incurring the + * overhead of a callback on every single batch. + * + *

For graphs with multiple hierarchical levels the compactor calls + * {@link #onProgress} separately per level; the {@code totalBatches} value resets + * to the batch count of the new level at the start of each level. Level 0 + * (the base layer) is by far the largest and dominates total runtime, so + * level-local progress is a good proxy for overall merge progress. + * + *

Implementations must be thread-safe: callbacks are issued from whichever + * thread drives the completion service inside + * {@code OnDiskGraphIndexCompactor.runBatchesWithBackpressure}. + */ +@FunctionalInterface +public interface CompactionProgressListener { + + /** + * Called periodically as batches are written to disk. + * + * @param completedBatches number of batches written to disk so far in the + * current level (always a positive multiple of 10, + * or equal to {@code totalBatches} for the final call) + * @param totalBatches total number of batches for the current level + */ + void onProgress(long completedBatches, long totalBatches); +} diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index ef1d8bcb5..4939c8e5f 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -207,8 +207,24 @@ private void validateFeatures(List sources) { /** * Main compaction entry point. Merges all source indexes into a single output index at the * specified path, handling PQ retraining if needed, and writing header, all layers, and footer. + * + *

Equivalent to {@code compact(outputPath, null)}. */ public void compact(Path outputPath) throws FileNotFoundException { + compact(outputPath, null); + } + + /** + * Main compaction entry point with optional progress reporting. Merges all source indexes into + * a single output index at the specified path, handling PQ retraining if needed, and writing + * header, all layers, and footer. + * + * @param outputPath destination file for the merged graph + * @param progressListener optional callback invoked every ten batches (and at completion of + * each level) so the caller can track I/O progress; pass {@code null} + * to disable progress reporting + */ + public void compact(Path outputPath, CompactionProgressListener progressListener) throws FileNotFoundException { boolean fusedPQEnabled = hasFusedPQ(); boolean compressedPrecision = fusedPQEnabled; @@ -230,7 +246,7 @@ public void compact(Path outputPath) throws FileNotFoundException { numTotalNodes, maxOrdinal, dimension, maxDegrees.get(0)); try (CompactWriter writer = new CompactWriter(outputPath, maxOrdinal, numTotalNodes, 0, layerInfo, entryNode, dimension, maxDegrees, pq, pqLength, fusedPQEnabled)) { writer.writeHeader(); - compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq); + compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq, progressListener); // When FusedPQ is enabled and there is no hierarchy (only L0), the reader expects // to find the entry node's own PQ code written after the L0 block, just as @@ -304,7 +320,8 @@ private void compactLevels(CompactWriter writer, VectorSimilarityFunction similarityFunction, boolean fusedPQEnabled, boolean compressedPrecision, - ProductQuantization pq) + ProductQuantization pq, + CompactionProgressListener progressListener) throws IOException, ExecutionException, InterruptedException { int maxUpperDegree = 0; @@ -362,7 +379,8 @@ private void compactLevels(CompactWriter writer, } catch (IOException e) { throw new RuntimeException(e); } - } + }, + progressListener ); } @@ -399,7 +417,8 @@ private void compactLevels(CompactWriter writer, } catch (IOException e) { throw new RuntimeException(e); } - } + }, + progressListener ); } } @@ -740,12 +759,17 @@ private float rescore(OnDiskGraphIndex.View view, * Executes batches with controlled concurrency using a sliding window approach. Prevents * overwhelming memory by limiting the number of in-flight tasks while maintaining high * throughput via the completion service. + * + * @param progressListener optional; when non-null, called every ten batches (and at the + * final batch) with {@code (completedBatches, totalBatches)} so + * the caller can expose live compaction progress */ private void runBatchesWithBackpressure( List batches, ExecutorCompletionService> ecs, java.util.function.Consumer submitOne, - java.util.function.Consumer> onComplete + java.util.function.Consumer> onComplete, + CompactionProgressListener progressListener ) throws InterruptedException, ExecutionException { final int total = batches.size(); @@ -770,8 +794,11 @@ private void runBatchesWithBackpressure( submitOne.accept(batches.get(nextToSubmit++)); inFlight++; } - if (completed % 10 == 0) { + if (completed % 10 == 0 || completed == total) { log.info("Compaction I/O progress: {}/{} batches written to disk", completed, total); + if (progressListener != null) { + progressListener.onProgress(completed, total); + } } } }