From 94aa795735b3b30122e960c9e470806110c9e868 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 11 May 2026 23:23:10 +0200 Subject: [PATCH] Add CompactionProgressListener to OnDiskGraphIndexCompactor Introduce a dedicated CompactionProgressListener functional interface and a compact(Path, CompactionProgressListener) overload so callers can track streaming compaction I/O progress without parsing log messages. The listener is called every ten batches (and at level completion) with (completedBatches, totalBatches) for each graph level processed. The existing compact(Path) delegate passes null and preserves full backward compatibility. Motivated by HerdDB issue #530: the index-optimizer's GET /status endpoint showed batches_written=0 / pct_complete=0 throughout multi- minute streaming compaction runs because there was no hook to propagate the per-batch progress counters back to the HTTP status object. --- .../disk/CompactionProgressListener.java | 50 +++++++++++++++++++ .../graph/disk/OnDiskGraphIndexCompactor.java | 39 ++++++++++++--- 2 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java new file mode 100644 index 000000000..5a6fb9f1a --- /dev/null +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/CompactionProgressListener.java @@ -0,0 +1,50 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.jbellis.jvector.graph.disk; + +/** + * Callback interface for monitoring the I/O progress of an + * {@link OnDiskGraphIndexCompactor} run. + * + *

The compactor processes nodes in batches, writing each batch to disk as it + * completes. {@link #onProgress} is called every ten batches so that the caller + * can update a live status display or health-check endpoint without incurring the + * overhead of a callback on every single batch. + * + *

For graphs with multiple hierarchical levels the compactor calls + * {@link #onProgress} separately per level; the {@code totalBatches} value resets + * to the batch count of the new level at the start of each level. Level 0 + * (the base layer) is by far the largest and dominates total runtime, so + * level-local progress is a good proxy for overall merge progress. + * + *

Implementations must be thread-safe: callbacks are issued from whichever + * thread drives the completion service inside + * {@code OnDiskGraphIndexCompactor.runBatchesWithBackpressure}. + */ +@FunctionalInterface +public interface CompactionProgressListener { + + /** + * Called periodically as batches are written to disk. + * + * @param completedBatches number of batches written to disk so far in the + * current level (always a positive multiple of 10, + * or equal to {@code totalBatches} for the final call) + * @param totalBatches total number of batches for the current level + */ + void onProgress(long completedBatches, long totalBatches); +} diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java index ef1d8bcb5..4939c8e5f 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexCompactor.java @@ -207,8 +207,24 @@ private void validateFeatures(List sources) { /** * Main compaction entry point. Merges all source indexes into a single output index at the * specified path, handling PQ retraining if needed, and writing header, all layers, and footer. + * + *

Equivalent to {@code compact(outputPath, null)}. */ public void compact(Path outputPath) throws FileNotFoundException { + compact(outputPath, null); + } + + /** + * Main compaction entry point with optional progress reporting. Merges all source indexes into + * a single output index at the specified path, handling PQ retraining if needed, and writing + * header, all layers, and footer. + * + * @param outputPath destination file for the merged graph + * @param progressListener optional callback invoked every ten batches (and at completion of + * each level) so the caller can track I/O progress; pass {@code null} + * to disable progress reporting + */ + public void compact(Path outputPath, CompactionProgressListener progressListener) throws FileNotFoundException { boolean fusedPQEnabled = hasFusedPQ(); boolean compressedPrecision = fusedPQEnabled; @@ -230,7 +246,7 @@ public void compact(Path outputPath) throws FileNotFoundException { numTotalNodes, maxOrdinal, dimension, maxDegrees.get(0)); try (CompactWriter writer = new CompactWriter(outputPath, maxOrdinal, numTotalNodes, 0, layerInfo, entryNode, dimension, maxDegrees, pq, pqLength, fusedPQEnabled)) { writer.writeHeader(); - compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq); + compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq, progressListener); // When FusedPQ is enabled and there is no hierarchy (only L0), the reader expects // to find the entry node's own PQ code written after the L0 block, just as @@ -304,7 +320,8 @@ private void compactLevels(CompactWriter writer, VectorSimilarityFunction similarityFunction, boolean fusedPQEnabled, boolean compressedPrecision, - ProductQuantization pq) + ProductQuantization pq, + CompactionProgressListener progressListener) throws IOException, ExecutionException, InterruptedException { int maxUpperDegree = 0; @@ -362,7 +379,8 @@ private void compactLevels(CompactWriter writer, } catch (IOException e) { throw new RuntimeException(e); } - } + }, + progressListener ); } @@ -399,7 +417,8 @@ private void compactLevels(CompactWriter writer, } catch (IOException e) { throw new RuntimeException(e); } - } + }, + progressListener ); } } @@ -740,12 +759,17 @@ private float rescore(OnDiskGraphIndex.View view, * Executes batches with controlled concurrency using a sliding window approach. Prevents * overwhelming memory by limiting the number of in-flight tasks while maintaining high * throughput via the completion service. + * + * @param progressListener optional; when non-null, called every ten batches (and at the + * final batch) with {@code (completedBatches, totalBatches)} so + * the caller can expose live compaction progress */ private void runBatchesWithBackpressure( List batches, ExecutorCompletionService> ecs, java.util.function.Consumer submitOne, - java.util.function.Consumer> onComplete + java.util.function.Consumer> onComplete, + CompactionProgressListener progressListener ) throws InterruptedException, ExecutionException { final int total = batches.size(); @@ -770,8 +794,11 @@ private void runBatchesWithBackpressure( submitOne.accept(batches.get(nextToSubmit++)); inFlight++; } - if (completed % 10 == 0) { + if (completed % 10 == 0 || completed == total) { log.info("Compaction I/O progress: {}/{} batches written to disk", completed, total); + if (progressListener != null) { + progressListener.onProgress(completed, total); + } } } }