Skip to content

Commit 51fa15a

Browse files
committed
[server] Fix TabletServer fails to restart after unclean shutdown due to EOFException in log recovery
1 parent bdbbbce commit 51fa15a

3 files changed

Lines changed: 142 additions & 131 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
2424
import org.apache.fluss.exception.LogStorageException;
2525
import org.apache.fluss.metadata.LogFormat;
26-
import org.apache.fluss.server.exception.CorruptIndexException;
2726
import org.apache.fluss.utils.FlussPaths;
2827
import org.apache.fluss.utils.types.Tuple2;
2928

@@ -137,6 +136,7 @@ public LoadedLogOffsets load() throws IOException {
137136
*/
138137
private Tuple2<Long, Long> recoverLog() throws IOException {
139138
if (!isCleanShutdown) {
139+
long recoverLogStart = System.currentTimeMillis();
140140
List<LogSegment> unflushed =
141141
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
142142
int numUnflushed = unflushed.size();
@@ -153,46 +153,41 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
153153
numUnflushed,
154154
logSegments.getTableBucket());
155155

156+
int truncatedBytes = -1;
156157
try {
157-
segment.sanityCheck();
158-
} catch (NoSuchFileException | CorruptIndexException e) {
158+
truncatedBytes = recoverSegment(segment);
159+
} catch (InvalidOffsetException e) {
160+
long startOffset = segment.getBaseOffset();
159161
LOG.warn(
160-
"Found invalid index file corresponding log file {} for bucket {}, "
161-
+ "recovering segment and rebuilding index files...",
162-
segment.getFileLogRecords().file().getAbsoluteFile(),
162+
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
163+
+ "and creating an empty one with starting offset {}",
163164
logSegments.getTableBucket(),
164-
e);
165-
166-
int truncatedBytes = -1;
167-
try {
168-
truncatedBytes = recoverSegment(segment);
169-
} catch (InvalidOffsetException invalidOffsetException) {
170-
long startOffset = segment.getBaseOffset();
171-
LOG.warn(
172-
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
173-
+ "and creating an empty one with starting offset {}",
174-
logSegments.getTableBucket(),
175-
startOffset);
176-
truncatedBytes = segment.truncateTo(startOffset);
177-
}
165+
startOffset);
166+
truncatedBytes = segment.truncateTo(startOffset);
167+
}
178168

179-
if (truncatedBytes > 0) {
180-
// we had an invalid message, delete all remaining log
181-
LOG.warn(
182-
"Corruption found in segment {} for bucket {}, truncating to offset {}",
183-
segment.getBaseOffset(),
184-
logSegments.getTableBucket(),
185-
segment.readNextOffset());
186-
removeAndDeleteSegments(unflushedIter);
187-
truncated = true;
188-
}
169+
if (truncatedBytes > 0) {
170+
// we had an invalid message, delete all remaining log
171+
LOG.warn(
172+
"Corruption found in segment {} for bucket {}, truncating to offset {}",
173+
segment.getBaseOffset(),
174+
logSegments.getTableBucket(),
175+
segment.readNextOffset());
176+
removeAndDeleteSegments(unflushedIter);
177+
truncated = true;
178+
} else {
179+
numFlushed += 1;
189180
}
190-
numFlushed += 1;
191181
}
182+
long recoverLogEnd = System.currentTimeMillis();
183+
LOG.info(
184+
"Log recovery completed for bucket {} in {} ms",
185+
logSegments.getTableBucket(),
186+
recoverLogEnd - recoverLogStart);
192187
}
193188

194-
// TODO truncate log to recover maybe unflush segments.
195189
if (logSegments.isEmpty()) {
190+
// TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
196191
logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat));
197192
}
198193
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
@@ -294,6 +289,20 @@ private void loadSegmentFiles() throws IOException {
294289
long baseOffset = FlussPaths.offsetFromFile(file);
295290
LogSegment segment =
296291
LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat);
292+
293+
try {
294+
segment.sanityCheck();
295+
} catch (NoSuchFileException e) {
296+
if (isCleanShutdown
297+
|| segment.getBaseOffset() < recoveryPointCheckpoint) {
298+
LOG.error(
299+
"Could not find offset index file corresponding to log file {} "
300+
+ "for bucket {}, recovering segment and rebuilding index files...",
301+
logSegments.getTableBucket(),
302+
segment.getFileLogRecords().file().getAbsoluteFile());
303+
}
304+
recoverSegment(segment);
305+
}
297306
logSegments.add(segment);
298307
}
299308
}

fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,17 @@ public void sanityCheck() throws IOException {
180180
+ lazyOffsetIndex.file().getAbsolutePath()
181181
+ " does not exist.");
182182
}
183-
lazyOffsetIndex.get().sanityCheck();
184183

185184
if (!lazyTimeIndex.file().exists()) {
186185
throw new NoSuchFileException(
187186
"Time index file "
188187
+ lazyTimeIndex.file().getAbsolutePath()
189188
+ " does not exist.");
190189
}
191-
lazyTimeIndex.get().sanityCheck();
190+
191+
// Sanity checks for time index and offset index are skipped because
192+
// we will recover the segments above the recovery point in recoverLog()
193+
// in any case so sanity checking them here is redundant.
192194
}
193195

194196
/**

fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java

Lines changed: 97 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,6 @@ void testInvalidOffsetRebuild() throws Exception {
268268
corruptSegment.getFileLogRecords().append(memoryLogRecords);
269269
logTablet.close();
270270

271-
// delete the index file to trigger the recovery
272-
corruptSegment.offsetIndex().deleteIfExists();
273-
274271
logTablet = createLogTablet(false);
275272
// the corrupt segment should be truncated to base offset
276273
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
@@ -308,9 +305,6 @@ void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws Exception
308305
.sorted())
309306
.containsExactly(2L, 4L);
310307

311-
// Delete all offset index files to trigger segment recover
312-
deleteAllOffsetIndexFile(log);
313-
314308
log = createLogTablet(false);
315309
assertThat(
316310
WriterStateManager.listSnapshotFiles(logDir).stream()
@@ -408,7 +402,7 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
408402
// after this step, the segments should be
409403
// [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
410404
// writer id 1 2 1 2 1 2
411-
// se1 id 0 0 1 1 2 2
405+
// seq id 0 0 1 1 2 2
412406
// snapshot 1 2 3 4 5 6
413407
for (int i = 0; i <= 5; i++) {
414408
if (i % 2 == 0) {
@@ -473,22 +467,19 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
473467
long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4);
474468
assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint);
475469

476-
// 1. Test unclean shut without any recovery
470+
// 1. Test unclean shutdown
477471
// Retain snapshots for the last 2 segments (delete snapshots before that)
478472
long snapshotRetentionOffset = segmentOffsets.get(segmentOffsets.size() - 2);
479473
log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset);
480474
log.close();
481475

482-
// Reopen the log with recovery point. Although we use unclean shutdown here,
483-
// all the index files are correctly close, so Fluss will not trigger recover for any
484-
// segment.
476+
// Reopen the log with recovery point. Since we use unclean shutdown here,
477+
// all the log segments after recovery point will be recovered.
485478
log = createLogTablet(false, recoveryPoint);
486479

487-
// Expected snapshot offsets: last 2 segment base offsets + log end offset
488-
List<Long> lastTowSegmentOffsets =
489-
segmentOffsets.subList(
490-
Math.max(0, segmentOffsets.size() - 2), segmentOffsets.size());
491-
expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets);
480+
// Expected snapshot offsets: segments base offset after recovery point
481+
expectedSnapshotOffsets =
482+
new HashSet<>(segmentOffsets.subList((int) recoveryPoint, segmentOffsets.size()));
492483
expectedSnapshotOffsets.add(log.localLogEndOffset());
493484

494485
// Verify that expected snapshot offsets exist
@@ -509,71 +500,6 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
509500
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
510501
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
511502
log.close();
512-
513-
// 2. Test unclean shut down without recover segment will rebuild writer state
514-
// Delete all snapshot files
515-
deleteAllWriterSnapshot(logDir);
516-
517-
// Reopen the log with recovery point. Although Fluss will not trigger recover for any
518-
// segment, but the writer state should be rebuilt.
519-
log = createLogTablet(false, recoveryPoint);
520-
521-
actualSnapshotOffsets =
522-
WriterStateManager.listSnapshotFiles(logDir).stream()
523-
.map(snapshotFile -> snapshotFile.offset)
524-
.collect(Collectors.toSet());
525-
// Will rebuild writer state for all segments, but only take snapshot for the last 2
526-
// segments and the last offset
527-
expectedSnapshotOffsets = new HashSet<>();
528-
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 2));
529-
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 1));
530-
expectedSnapshotOffsets.add(log.localLogEndOffset());
531-
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
532-
533-
// Verify that expected writers last batch sequence
534-
actualWritersLastBatchSequence =
535-
log.writerStateManager().activeWriters().entrySet().stream()
536-
.collect(
537-
Collectors.toMap(
538-
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
539-
expectedWritersLastBatchSequence = new HashMap<>();
540-
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
541-
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
542-
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
543-
log.close();
544-
545-
// 3. Test unclean shut down with recover segment will rebuild writer state for each segment
546-
// after recovery point.
547-
// Delete all snapshot files and index files (to trigger segment recover)
548-
deleteAllWriterSnapshot(logDir);
549-
deleteAllOffsetIndexFile(log);
550-
551-
// Reopen the log with recovery point. All segments will be recovered since we delete all
552-
// the index files
553-
log = createLogTablet(false, recoveryPoint);
554-
555-
// Writer snapshot files should be rebuilt for each segment after recovery point
556-
actualSnapshotOffsets =
557-
WriterStateManager.listSnapshotFiles(logDir).stream()
558-
.map(snapshotFile -> snapshotFile.offset)
559-
.collect(Collectors.toSet());
560-
expectedSnapshotOffsets =
561-
log.logSegments(recoveryPoint, log.localLogEndOffset()).stream()
562-
.map(LogSegment::getBaseOffset)
563-
.collect(Collectors.toSet());
564-
expectedSnapshotOffsets.add(log.localLogEndOffset());
565-
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
566-
567-
actualWritersLastBatchSequence =
568-
log.writerStateManager().activeWriters().entrySet().stream()
569-
.collect(
570-
Collectors.toMap(
571-
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
572-
expectedWritersLastBatchSequence = new HashMap<>();
573-
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
574-
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
575-
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
576-
log.close();
577503
}
578504

579505
@Test
@@ -627,6 +553,89 @@ void testLoadingLogKeepsLargestStrayWriterStateSnapshot() throws Exception {
627553
.containsExactly(1L, 2L, 4L);
628554
}
629555

556+
/**
557+
* Tests that log recovery is correctly triggered after an unclean shutdown (broker crash).
558+
*
559+
* <p>The test simulates an unclean shutdown by:
560+
*
561+
* <ol>
562+
* <li>Writing records to a log to create multiple segments
563+
* <li>Recording the logEndOffset before corruption
564+
* <li>Corrupting the index files (appending garbage bytes, simulating incomplete flush)
565+
* <li>Appending garbage bytes to the active .log file tail (simulating partial batch write)
566+
* <li>Reloading the log with {@code isCleanShutdown=false}
567+
* <li>Verifying that recovery truncates the corrupt tail, rebuilds indexes, and restores the
568+
* correct logEndOffset
569+
* </ol>
570+
*/
571+
@Test
572+
void testLogRecoveryIsCalledUponBrokerCrash() throws Exception {
573+
// Step 1: Create log and write enough records to produce multiple segments
574+
int numRecords = 200;
575+
LogTablet logTablet = createLogTablet(true);
576+
appendRecords(logTablet, numRecords);
577+
578+
// Step 2: Record the expected logEndOffset after clean writes
579+
long expectedLogEndOffset = logTablet.localLogEndOffset();
580+
assertThat(expectedLogEndOffset).isEqualTo(numRecords);
581+
582+
// Step 3: Collect index files and the active segment's .log file before "crash"
583+
List<File> indexFiles = collectIndexFiles(logTablet.logSegments());
584+
File activeLogFile = logTablet.activeLogSegment().getFileLogRecords().file();
585+
586+
// Step 4: Close the log (simulating the on-disk state at the moment of crash)
587+
logTablet.close();
588+
589+
// Step 5: Corrupt the index files (simulate incomplete index flush during crash)
590+
for (File indexFile : indexFiles) {
591+
try (FileChannel fileChannel =
592+
FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) {
593+
// Append 12 bytes of garbage – enough to make the index size non-multiple of
594+
// entrySize and trigger CorruptIndexException during sanityCheck
595+
for (int i = 0; i < 12; i++) {
596+
fileChannel.write(ByteBuffer.wrap(new byte[] {(byte) i}));
597+
}
598+
}
599+
}
600+
601+
// Step 6: Append garbage bytes to the active segment's .log file tail
602+
// (simulate a partial/incomplete batch that was being written at crash time)
603+
try (FileChannel logFileChannel =
604+
FileChannel.open(activeLogFile.toPath(), StandardOpenOption.APPEND)) {
605+
byte[] garbage = new byte[64];
606+
for (int i = 0; i < garbage.length; i++) {
607+
garbage[i] = (byte) (i & 0xFF);
608+
}
609+
logFileChannel.write(ByteBuffer.wrap(garbage));
610+
}
611+
612+
// Step 7: Reload with isCleanShutdown=false – this must trigger log recovery
613+
logTablet = createLogTablet(false);
614+
615+
// Step 8: Verify that recovery succeeded and the logEndOffset is restored correctly.
616+
// Recovery should truncate the garbage bytes appended to the .log file and rebuild
617+
// indexes, so the final offset must equal the number of valid records written.
618+
assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset);
619+
620+
// Step 9: Verify all index files pass sanity check after recovery
621+
for (LogSegment segment : logTablet.logSegments()) {
622+
segment.offsetIndex().sanityCheck();
623+
segment.timeIndex().sanityCheck();
624+
}
625+
626+
// Step 10: Verify that all previously written records are still readable after recovery
627+
for (int i = 0; i < numRecords; i++) {
628+
assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10L))
629+
.isEqualTo(i);
630+
}
631+
632+
// Step 11: Verify that the log is still writable after recovery
633+
appendRecords(logTablet, 1, (int) expectedLogEndOffset);
634+
assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset + 1);
635+
636+
logTablet.close();
637+
}
638+
630639
private LogTablet createLogTablet(boolean isCleanShutdown) throws Exception {
631640
return createLogTablet(isCleanShutdown, 0);
632641
}
@@ -648,8 +657,13 @@ private LogTablet createLogTablet(boolean isCleanShutdown, long recoveryPoint)
648657
}
649658

650659
private void appendRecords(LogTablet logTablet, int numRecords) throws Exception {
651-
int baseOffset = 0;
652-
int batchSequence = 0;
660+
appendRecords(logTablet, numRecords, 0);
661+
}
662+
663+
private void appendRecords(LogTablet logTablet, int numRecords, int startOffset)
664+
throws Exception {
665+
int baseOffset = startOffset;
666+
int batchSequence = startOffset;
653667
for (int i = 0; i < numRecords; i++) {
654668
List<Object[]> objects = Collections.singletonList(new Object[] {1, "a"});
655669
List<ChangeType> changeTypes =
@@ -683,18 +697,4 @@ private List<File> collectIndexFiles(List<LogSegment> logSegments) throws IOExce
683697
}
684698
return indexFiles;
685699
}
686-
687-
private void deleteAllOffsetIndexFile(LogTablet logTablet) throws IOException {
688-
List<LogSegment> logSegments = logTablet.logSegments();
689-
for (LogSegment segment : logSegments) {
690-
segment.offsetIndex().deleteIfExists();
691-
}
692-
}
693-
694-
private void deleteAllWriterSnapshot(File logDir) throws Exception {
695-
List<SnapshotFile> snapshotFiles = WriterStateManager.listSnapshotFiles(logDir);
696-
for (SnapshotFile snapshotFile : snapshotFiles) {
697-
snapshotFile.deleteIfExists();
698-
}
699-
}
700700
}

0 commit comments

Comments
 (0)