Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.store.AlreadyClosedException;
Expand Down Expand Up @@ -226,4 +227,7 @@ public String toString() {

@Override
public void checkIntegrity() throws IOException {}

@Override
public void checkIntegrity(MergePolicy.AbortChecker abortChecker) throws IOException {}
}
59 changes: 58 additions & 1 deletion lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataInput;
Expand Down Expand Up @@ -604,9 +605,31 @@ private static void validateFooter(IndexInput in) throws IOException {
* extract the checksum value, call {@link #retrieveChecksum}.
*/
public static long checksumEntireFile(IndexInput input) throws IOException {
return checksumEntireFile(input, MergePolicy.AbortChecker.NO_OP);
}

/**
* Like {@link #checksumEntireFile(IndexInput)}, but periodically checks if the operation should
* be aborted via the provided {@link org.apache.lucene.index.MergePolicy.AbortChecker}. This is
* useful during merge operations where the merge may be cancelled while a long-running integrity
* check is in progress.
*
* @param input the index input to checksum
* @param abortChecker checked periodically during the read; throws {@link
* org.apache.lucene.index.MergePolicy.MergeAbortedException} if the operation should stop
*/
public static long checksumEntireFile(IndexInput input, MergePolicy.AbortChecker abortChecker)
throws IOException {
IndexInput clone = input.clone();
clone.seek(0);
ChecksumIndexInput in = new BufferedChecksumIndexInput(clone);
final ChecksumIndexInput in;
int intervalBytes = abortChecker.getAbortCheckIntervalBytes();
if (intervalBytes > 0) {
in = new AbortableBufferedChecksumIndexInput(clone, abortChecker, intervalBytes);
} else {
assert abortChecker == MergePolicy.AbortChecker.NO_OP : abortChecker;
in = new BufferedChecksumIndexInput(clone);
}
assert in.getFilePointer() == 0;
if (in.length() < footerLength()) {
throw new CorruptIndexException(
Expand Down Expand Up @@ -675,4 +698,38 @@ public static int readBEInt(DataInput in) throws IOException {
public static long readBELong(DataInput in) throws IOException {
return (((long) readBEInt(in)) << 32) | (readBEInt(in) & 0xFFFFFFFFL);
}

/**
* A {@link BufferedChecksumIndexInput} that periodically checks if the current merge should be
* aborted.
*
* <p>The check is performed in {@link #readBytes} since that is what {@link
* ChecksumIndexInput#seek} calls in a loop to compute the checksum.
*/
private static final class AbortableBufferedChecksumIndexInput

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you're adding another level of buffering / copying, when someone turns on this new setting? I think? Can we subclass FilterIndexInput instead, so it's simply/only the added accounting (tracking total bytes written) and not more buffering / copying?

extends BufferedChecksumIndexInput {

private final MergePolicy.AbortChecker abortChecker;
private final int intervalBytes;
private long bytesSinceLastCheck;

AbortableBufferedChecksumIndexInput(
IndexInput delegate, MergePolicy.AbortChecker abortChecker, int intervalBytes) {
super(delegate);
this.abortChecker = abortChecker;
this.intervalBytes = intervalBytes;
// set to trigger a check before the first read
this.bytesSinceLastCheck = intervalBytes;
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
bytesSinceLastCheck += len;
if (bytesSinceLastCheck >= intervalBytes) {
abortChecker.checkAborted();
bytesSinceLastCheck = 0L;
}
super.readBytes(b, offset, len);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.StoredFields;

Expand Down Expand Up @@ -47,6 +48,13 @@ protected StoredFieldsReader() {}
*/
public abstract void checkIntegrity() throws IOException;

/**
* Checks consistency of this reader, with periodic merge abort checking.
*
* @lucene.internal
*/
public abstract void checkIntegrity(MergePolicy.AbortChecker abortChecker) throws IOException;

/**
* Returns an instance optimized for merging. This instance may not be cloned.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public int merge(MergeState mergeState) throws IOException {
List<StoredFieldsMergeSub> subs = new ArrayList<>();
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
StoredFieldsReader storedFieldsReader = mergeState.storedFieldsReaders[i];
storedFieldsReader.checkIntegrity();
storedFieldsReader.checkIntegrity(mergeState.abortChecker);
subs.add(
new StoredFieldsMergeSub(
new MergeVisitor(mergeState, i),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.index.MergePolicy;

abstract class FieldsIndex implements Cloneable, Closeable {

Expand All @@ -38,6 +39,9 @@ final long getStartPointer(int docID) {
/** Check the integrity of the index. */
abstract void checkIntegrity() throws IOException;

/** Check the integrity of the index, with periodic merge abort checking. */
public abstract void checkIntegrity(MergePolicy.AbortChecker abortChecker) throws IOException;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only adding "check every N bytes", can we rename AbortChecker to something more specific? Is this a public API, or will Lucene users only interact with the setter/getter on IWC (taking just int or long)?


@Override
public abstract FieldsIndex clone();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FileTypeHint;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -155,6 +156,11 @@ public long getMaxPointer() {

@Override
void checkIntegrity() throws IOException {
CodecUtil.checksumEntireFile(indexInput);
checkIntegrity(MergePolicy.AbortChecker.NO_OP);
}

@Override
public void checkIntegrity(MergePolicy.AbortChecker abortChecker) throws IOException {
CodecUtil.checksumEntireFile(indexInput, abortChecker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.StoredFieldDataInput;
import org.apache.lucene.index.StoredFieldVisitor;
Expand Down Expand Up @@ -756,8 +757,13 @@ int getNumDocs() {

@Override
public void checkIntegrity() throws IOException {
indexReader.checkIntegrity();
CodecUtil.checksumEntireFile(fieldsStream);
checkIntegrity(MergePolicy.AbortChecker.NO_OP);
}

@Override
public void checkIntegrity(MergePolicy.AbortChecker abortChecker) throws IOException {
indexReader.checkIntegrity(abortChecker);
CodecUtil.checksumEntireFile(fieldsStream, abortChecker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public int merge(MergeState mergeState) throws IOException {
new ArrayList<>(mergeState.storedFieldsReaders.length);
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
final StoredFieldsReader reader = mergeState.storedFieldsReaders[i];
reader.checkIntegrity();
reader.checkIntegrity(mergeState.abortChecker);
MergeStrategy mergeStrategy = getMergeStrategy(mergeState, matchingReaders, i);
if (mergeStrategy == MergeStrategy.VISITOR) {
visitors[i] = new MergeVisitor(mergeState, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static MergeState restrictFields(MergeState in, Collection<String> fields) {
in.maxDocs,
in.infoStream,
in.intraMergeTaskExecutor,
in.needsIndexSort);
in.needsIndexSort,
in.abortChecker);
}

private static class FilterFieldInfos extends FieldInfos {
Expand Down
12 changes: 10 additions & 2 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3483,6 +3483,9 @@ public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException
}
}

final MergePolicy.AbortChecker mergeAbortChecker =
MergePolicy.AbortChecker.create(merge, config.getMergeAbortCheckIntervalBytes());

SegmentMerger merger =
new SegmentMerger(
readers,
Expand All @@ -3491,7 +3494,8 @@ public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException
trackingDir,
globalFieldNumberMap,
context,
intraMergeExecutor);
intraMergeExecutor,
mergeAbortChecker);
try {
if (!merger.shouldMerge()) {
return;
Expand Down Expand Up @@ -5284,6 +5288,9 @@ public int length() {
}
}

final MergePolicy.AbortChecker mergeAbortChecker =
MergePolicy.AbortChecker.create(merge, config.getMergeAbortCheckIntervalBytes());

final SegmentMerger merger =
new SegmentMerger(
mergeReaders,
Expand All @@ -5292,7 +5299,8 @@ public int length() {
dirWrapper,
globalFieldNumberMap,
context,
intraMergeExecutor);
intraMergeExecutor,
mergeAbortChecker);
MergeState mergeState = merger.mergeState;
MergeState.DocMap[] docMaps;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,21 @@ public IndexWriterConfig setParentField(String parentField) {
this.parentField = parentField;
return this;
}

/**
* Expert: sets the interval in bytes between abort checks during merge integrity verification.
* During merges, codec readers verify file integrity by reading entire files to compute
* checksums. This setting controls how often the merge abort flag is checked during these reads.
* Smaller values allow merges to be aborted more promptly but add slight overhead.
*
* @param intervalBytes the interval in bytes, must be positive
*/
public IndexWriterConfig setMergeAbortCheckIntervalBytes(int intervalBytes) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this long? In general if something is measuring bytes let's try to use long -- we do math on such things that may lead to overflow?

@rmuir rmuir May 22, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not need a configurable value at all, just a reasonable one (e.g. 1MB). If someone is doing s3, even setting this to 1 could take "forever" due to network latency. We shouldn't overengineer here.

If we can simplify how we do it (see idea on CodecUtil.java), i'd suggest overloading the implementing function with a parameter, to allow unit tests to use a small value, but it doesn't need to be public.

if (intervalBytes <= 0) {
throw new IllegalArgumentException(
"mergeAbortCheckIntervalBytes must be positive, got: " + intervalBytes);
}
this.mergeAbortCheckIntervalBytes = intervalBytes;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public class LiveIndexWriterConfig {
/** The IndexWriter event listener to record key events * */
protected IndexWriterEventListener eventListener;

/** Interval in bytes between abort checks during merge integrity verification. */
protected volatile int mergeAbortCheckIntervalBytes;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it only checkIntegrity that we are newly instrumenting here?


// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
Expand Down Expand Up @@ -461,6 +464,14 @@ public IndexWriterEventListener getIndexWriterEventListener() {
return eventListener;
}

/**
* Returns the interval in bytes between abort checks during merge integrity verification. See
* {@link IndexWriterConfig#setMergeAbortCheckIntervalBytes(int)}.
*/
public int getMergeAbortCheckIntervalBytes() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long instead?

return mergeAbortCheckIntervalBytes;
}

/** Returns the parent document field name if configured. */
public String getParentField() {
return parentField;
Expand Down Expand Up @@ -495,6 +506,9 @@ public String toString() {
sb.append("leafSorter=").append(getLeafSorter()).append("\n");
sb.append("eventListener=").append(getIndexWriterEventListener()).append("\n");
sb.append("parentField=").append(getParentField()).append("\n");
sb.append("mergeAbortCheckIntervalBytes=")
.append(getMergeAbortCheckIntervalBytes())
.append("\n");
return sb.toString();
}
}
47 changes: 47 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,53 @@ public MergeException(Throwable exc) {
}
}

/**
* Interface for checking whether a merge has been aborted. Implementations should throw {@link
* MergeAbortedException} if the merge should stop.
*
* @lucene.experimental
*/
public static final class AbortChecker {

/**
* Creates an {@link AbortChecker} for the given merge. If {@code abortCheckIntervalBytes} is
* zero or negative, returns {@link AbortChecker#NO_OP}.
*
* @param merge the merge to check for abort
* @param abortCheckIntervalBytes interval in bytes between abort checks, or 0 to disable
*/
public static AbortChecker create(OneMerge merge, int abortCheckIntervalBytes) {
if (merge != null && abortCheckIntervalBytes > 0) {
return new AbortChecker(merge, abortCheckIntervalBytes);
}
return AbortChecker.NO_OP;
}

/** A no-op checker */
public static final AbortChecker NO_OP = new AbortChecker(null, 0);

private final OneMerge oneMerge;
private final int abortCheckIntervalBytes;

public AbortChecker(OneMerge oneMerge, int abortCheckIntervalBytes) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this private and do all construction via the factory method.

assert oneMerge != null || abortCheckIntervalBytes == 0 : abortCheckIntervalBytes;
this.oneMerge = oneMerge;
this.abortCheckIntervalBytes = abortCheckIntervalBytes;
}

/** Checks if the merge should be aborted, throwing MergeAbortedException if so. */
public void checkAborted() throws MergeAbortedException {
if (oneMerge != null) {
oneMerge.checkAborted();
}
}

/** Interval in bytes between abort checks during merge integrity verification. */
public int getAbortCheckIntervalBytes() {
return abortCheckIntervalBytes;
}
}

/**
* Thrown when a merge was explicitly aborted because {@link IndexWriter#abortMerges} was called.
* Normally this exception is privately caught and suppressed by {@link IndexWriter}.
Expand Down
Loading
Loading