Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.server.exception.CorruptIndexException;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.types.Tuple2;

Expand Down Expand Up @@ -137,6 +136,7 @@ public LoadedLogOffsets load() throws IOException {
*/
private Tuple2<Long, Long> recoverLog() throws IOException {
if (!isCleanShutdown) {
long recoverLogStart = System.currentTimeMillis();
List<LogSegment> unflushed =
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
int numUnflushed = unflushed.size();
Expand All @@ -153,46 +153,41 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
numUnflushed,
logSegments.getTableBucket());

int truncatedBytes = -1;
try {
segment.sanityCheck();
} catch (NoSuchFileException | CorruptIndexException e) {
truncatedBytes = recoverSegment(segment);
} catch (InvalidOffsetException e) {
long startOffset = segment.getBaseOffset();
LOG.warn(
"Found invalid index file corresponding log file {} for bucket {}, "
+ "recovering segment and rebuilding index files...",
segment.getFileLogRecords().file().getAbsoluteFile(),
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
+ "and creating an empty one with starting offset {}",
logSegments.getTableBucket(),
e);

int truncatedBytes = -1;
try {
truncatedBytes = recoverSegment(segment);
} catch (InvalidOffsetException invalidOffsetException) {
long startOffset = segment.getBaseOffset();
LOG.warn(
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
+ "and creating an empty one with starting offset {}",
logSegments.getTableBucket(),
startOffset);
truncatedBytes = segment.truncateTo(startOffset);
}
startOffset);
truncatedBytes = segment.truncateTo(startOffset);
}

if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
LOG.warn(
"Corruption found in segment {} for bucket {}, truncating to offset {}",
segment.getBaseOffset(),
logSegments.getTableBucket(),
segment.readNextOffset());
removeAndDeleteSegments(unflushedIter);
truncated = true;
}
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
LOG.warn(
"Corruption found in segment {} for bucket {}, truncating to offset {}",
segment.getBaseOffset(),
logSegments.getTableBucket(),
segment.readNextOffset());
removeAndDeleteSegments(unflushedIter);
truncated = true;
} else {
numFlushed += 1;
}
numFlushed += 1;
}
long recoverLogEnd = System.currentTimeMillis();
LOG.info(
"Log recovery completed for bucket {} in {} ms",
logSegments.getTableBucket(),
recoverLogEnd - recoverLogStart);
}

// TODO truncate log to recover maybe unflush segments.
if (logSegments.isEmpty()) {
// TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat));
}
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
Expand Down Expand Up @@ -294,6 +289,20 @@ private void loadSegmentFiles() throws IOException {
long baseOffset = FlussPaths.offsetFromFile(file);
LogSegment segment =
LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat);

try {
segment.sanityCheck();
} catch (NoSuchFileException e) {
if (isCleanShutdown
|| segment.getBaseOffset() < recoveryPointCheckpoint) {
LOG.error(
"Could not find offset index file corresponding to log file {} "
+ "for bucket {}, recovering segment and rebuilding index files...",
logSegments.getTableBucket(),
segment.getFileLogRecords().file().getAbsoluteFile());
}
recoverSegment(segment);
}
logSegments.add(segment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,17 @@ public void sanityCheck() throws IOException {
+ lazyOffsetIndex.file().getAbsolutePath()
+ " does not exist.");
}
lazyOffsetIndex.get().sanityCheck();

if (!lazyTimeIndex.file().exists()) {
throw new NoSuchFileException(
"Time index file "
+ lazyTimeIndex.file().getAbsolutePath()
+ " does not exist.");
}
lazyTimeIndex.get().sanityCheck();

// Sanity checks for time index and offset index are skipped because
// we will recover the segments above the recovery point in recoverLog()
// in any case so sanity checking them here is redundant.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void testInvalidOffsetRebuild() throws Exception {
corruptSegment.getFileLogRecords().append(memoryLogRecords);
logTablet.close();

// delete the index file to trigger the recovery
corruptSegment.offsetIndex().deleteIfExists();

logTablet = createLogTablet(false);
// the corrupt segment should be truncated to base offset
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
Expand Down Expand Up @@ -308,9 +305,6 @@ void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws Exception
.sorted())
.containsExactly(2L, 4L);

// Delete all offset index files to trigger segment recover
deleteAllOffsetIndexFile(log);

log = createLogTablet(false);
assertThat(
WriterStateManager.listSnapshotFiles(logDir).stream()
Expand Down Expand Up @@ -408,7 +402,7 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
// after this step, the segments should be
// [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
// writer id 1 2 1 2 1 2
// se1 id 0 0 1 1 2 2
// seq id 0 0 1 1 2 2
// snapshot 1 2 3 4 5 6
for (int i = 0; i <= 5; i++) {
if (i % 2 == 0) {
Expand Down Expand Up @@ -473,22 +467,19 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4);
assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint);

// 1. Test unclean shut without any recovery
// 1. Test unclean shutdown
// Retain snapshots for the last 2 segments (delete snapshots before that)
long snapshotRetentionOffset = segmentOffsets.get(segmentOffsets.size() - 2);
log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset);
log.close();

// Reopen the log with recovery point. Although we use unclean shutdown here,
// all the index files are correctly close, so Fluss will not trigger recover for any
// segment.
// Reopen the log with recovery point. Since we use unclean shutdown here,
// all the log segments after recovery point will be recovered.
log = createLogTablet(false, recoveryPoint);

// Expected snapshot offsets: last 2 segment base offsets + log end offset
List<Long> lastTowSegmentOffsets =
segmentOffsets.subList(
Math.max(0, segmentOffsets.size() - 2), segmentOffsets.size());
expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets);
// Expected snapshot offsets: segments base offset after recovery point
expectedSnapshotOffsets =
new HashSet<>(segmentOffsets.subList((int) recoveryPoint, segmentOffsets.size()));
expectedSnapshotOffsets.add(log.localLogEndOffset());

// Verify that expected snapshot offsets exist
Expand All @@ -509,71 +500,6 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
log.close();

// 2. Test unclean shut down without recover segment will rebuild writer state
// Delete all snapshot files
deleteAllWriterSnapshot(logDir);

// Reopen the log with recovery point. Although Fluss will not trigger recover for any
// segment, but the writer state should be rebuilt.
log = createLogTablet(false, recoveryPoint);

actualSnapshotOffsets =
WriterStateManager.listSnapshotFiles(logDir).stream()
.map(snapshotFile -> snapshotFile.offset)
.collect(Collectors.toSet());
// Will rebuild writer state for all segments, but only take snapshot for the last 2
// segments and the last offset
expectedSnapshotOffsets = new HashSet<>();
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 2));
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 1));
expectedSnapshotOffsets.add(log.localLogEndOffset());
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);

// Verify that expected writers last batch sequence
actualWritersLastBatchSequence =
log.writerStateManager().activeWriters().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
expectedWritersLastBatchSequence = new HashMap<>();
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
log.close();

// 3. Test unclean shut down with recover segment will rebuild writer state for each segment
// after recovery point.
// Delete all snapshot files and index files (to trigger segment recover)
deleteAllWriterSnapshot(logDir);
deleteAllOffsetIndexFile(log);

// Reopen the log with recovery point. All segments will be recovered since we delete all
// the index files
log = createLogTablet(false, recoveryPoint);

// Writer snapshot files should be rebuilt for each segment after recovery point
actualSnapshotOffsets =
WriterStateManager.listSnapshotFiles(logDir).stream()
.map(snapshotFile -> snapshotFile.offset)
.collect(Collectors.toSet());
expectedSnapshotOffsets =
log.logSegments(recoveryPoint, log.localLogEndOffset()).stream()
.map(LogSegment::getBaseOffset)
.collect(Collectors.toSet());
expectedSnapshotOffsets.add(log.localLogEndOffset());
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);

actualWritersLastBatchSequence =
log.writerStateManager().activeWriters().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
expectedWritersLastBatchSequence = new HashMap<>();
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
log.close();
}

@Test
Expand Down Expand Up @@ -627,6 +553,89 @@ void testLoadingLogKeepsLargestStrayWriterStateSnapshot() throws Exception {
.containsExactly(1L, 2L, 4L);
}

/**
* Tests that log recovery is correctly triggered after an unclean shutdown (broker crash).
*
* <p>The test simulates an unclean shutdown by:
*
* <ol>
* <li>Writing records to a log to create multiple segments
* <li>Recording the logEndOffset before corruption
* <li>Corrupting the index files (appending garbage bytes, simulating incomplete flush)
* <li>Appending garbage bytes to the active .log file tail (simulating partial batch write)
* <li>Reloading the log with {@code isCleanShutdown=false}
* <li>Verifying that recovery truncates the corrupt tail, rebuilds indexes, and restores the
* correct logEndOffset
* </ol>
*/
@Test
void testLogRecoveryIsCalledUponBrokerCrash() throws Exception {
// Step 1: Create log and write enough records to produce multiple segments
int numRecords = 200;
LogTablet logTablet = createLogTablet(true);
appendRecords(logTablet, numRecords);

// Step 2: Record the expected logEndOffset after clean writes
long expectedLogEndOffset = logTablet.localLogEndOffset();
assertThat(expectedLogEndOffset).isEqualTo(numRecords);

// Step 3: Collect index files and the active segment's .log file before "crash"
List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
File activeLogFile = logTablet.activeLogSegment().getFileLogRecords().file();

// Step 4: Close the log (simulating the on-disk state at the moment of crash)
logTablet.close();

// Step 5: Corrupt the index files (simulate incomplete index flush during crash)
for (File indexFile : indexFiles) {
try (FileChannel fileChannel =
FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) {
// Append 12 bytes of garbage – enough to make the index size non-multiple of
// entrySize and trigger CorruptIndexException during sanityCheck
for (int i = 0; i < 12; i++) {
fileChannel.write(ByteBuffer.wrap(new byte[] {(byte) i}));
}
}
}

// Step 6: Append garbage bytes to the active segment's .log file tail
// (simulate a partial/incomplete batch that was being written at crash time)
try (FileChannel logFileChannel =
FileChannel.open(activeLogFile.toPath(), StandardOpenOption.APPEND)) {
byte[] garbage = new byte[64];
for (int i = 0; i < garbage.length; i++) {
garbage[i] = (byte) (i & 0xFF);
}
logFileChannel.write(ByteBuffer.wrap(garbage));
}

// Step 7: Reload with isCleanShutdown=false – this must trigger log recovery
logTablet = createLogTablet(false);

// Step 8: Verify that recovery succeeded and the logEndOffset is restored correctly.
// Recovery should truncate the garbage bytes appended to the .log file and rebuild
// indexes, so the final offset must equal the number of valid records written.
assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset);

// Step 9: Verify all index files pass sanity check after recovery
for (LogSegment segment : logTablet.logSegments()) {
segment.offsetIndex().sanityCheck();
segment.timeIndex().sanityCheck();
}

// Step 10: Verify that all previously written records are still readable after recovery
for (int i = 0; i < numRecords; i++) {
assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10L))
.isEqualTo(i);
}

// Step 11: Verify that the log is still writable after recovery
appendRecords(logTablet, 1, (int) expectedLogEndOffset);
assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset + 1);

logTablet.close();
}

private LogTablet createLogTablet(boolean isCleanShutdown) throws Exception {
return createLogTablet(isCleanShutdown, 0);
}
Expand All @@ -648,8 +657,13 @@ private LogTablet createLogTablet(boolean isCleanShutdown, long recoveryPoint)
}

private void appendRecords(LogTablet logTablet, int numRecords) throws Exception {
int baseOffset = 0;
int batchSequence = 0;
appendRecords(logTablet, numRecords, 0);
}

private void appendRecords(LogTablet logTablet, int numRecords, int startOffset)
throws Exception {
int baseOffset = startOffset;
int batchSequence = startOffset;
for (int i = 0; i < numRecords; i++) {
List<Object[]> objects = Collections.singletonList(new Object[] {1, "a"});
List<ChangeType> changeTypes =
Expand Down Expand Up @@ -683,18 +697,4 @@ private List<File> collectIndexFiles(List<LogSegment> logSegments) throws IOExce
}
return indexFiles;
}

private void deleteAllOffsetIndexFile(LogTablet logTablet) throws IOException {
List<LogSegment> logSegments = logTablet.logSegments();
for (LogSegment segment : logSegments) {
segment.offsetIndex().deleteIfExists();
}
}

private void deleteAllWriterSnapshot(File logDir) throws Exception {
List<SnapshotFile> snapshotFiles = WriterStateManager.listSnapshotFiles(logDir);
for (SnapshotFile snapshotFile : snapshotFiles) {
snapshotFile.deleteIfExists();
}
}
}