From 51fa15ae72b492d3b5bce7ca9544e105b4470c53 Mon Sep 17 00:00:00 2001 From: Liebing Date: Thu, 26 Mar 2026 21:17:42 +0800 Subject: [PATCH] [server] Fix TabletServer fails to restart after unclean shutdown due to EOFException in log recovery --- .../apache/fluss/server/log/LogLoader.java | 73 ++++--- .../apache/fluss/server/log/LogSegment.java | 6 +- .../fluss/server/log/LogLoaderTest.java | 194 +++++++++--------- 3 files changed, 142 insertions(+), 131 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java index 572fbd3041..0b73ef0c7b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java @@ -23,7 +23,6 @@ import org.apache.fluss.exception.LogSegmentOffsetOverflowException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.metadata.LogFormat; -import org.apache.fluss.server.exception.CorruptIndexException; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.types.Tuple2; @@ -137,6 +136,7 @@ public LoadedLogOffsets load() throws IOException { */ private Tuple2 recoverLog() throws IOException { if (!isCleanShutdown) { + long recoverLogStart = System.currentTimeMillis(); List unflushed = logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE); int numUnflushed = unflushed.size(); @@ -153,46 +153,41 @@ private Tuple2 recoverLog() throws IOException { numUnflushed, logSegments.getTableBucket()); + int truncatedBytes = -1; try { - segment.sanityCheck(); - } catch (NoSuchFileException | CorruptIndexException e) { + truncatedBytes = recoverSegment(segment); + } catch (InvalidOffsetException e) { + long startOffset = segment.getBaseOffset(); LOG.warn( - "Found invalid index file corresponding log file {} for bucket {}, " - + "recovering segment and rebuilding index files...", - segment.getFileLogRecords().file().getAbsoluteFile(), + "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment " + + "and creating an empty one with starting offset {}", logSegments.getTableBucket(), - e); - - int truncatedBytes = -1; - try { - truncatedBytes = recoverSegment(segment); - } catch (InvalidOffsetException invalidOffsetException) { - long startOffset = segment.getBaseOffset(); - LOG.warn( - "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment " - + "and creating an empty one with starting offset {}", - logSegments.getTableBucket(), - startOffset); - truncatedBytes = segment.truncateTo(startOffset); - } + startOffset); + truncatedBytes = segment.truncateTo(startOffset); + } - if (truncatedBytes > 0) { - // we had an invalid message, delete all remaining log - LOG.warn( - "Corruption found in segment {} for bucket {}, truncating to offset {}", - segment.getBaseOffset(), - logSegments.getTableBucket(), - segment.readNextOffset()); - removeAndDeleteSegments(unflushedIter); - truncated = true; - } + if (truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + LOG.warn( + "Corruption found in segment {} for bucket {}, truncating to offset {}", + segment.getBaseOffset(), + logSegments.getTableBucket(), + segment.readNextOffset()); + removeAndDeleteSegments(unflushedIter); + truncated = true; + } else { + numFlushed += 1; } - numFlushed += 1; } + long recoverLogEnd = System.currentTimeMillis(); + LOG.info( + "Log recovery completed for bucket {} in {} ms", + logSegments.getTableBucket(), + recoverLogEnd - recoverLogStart); } - // TODO truncate log to recover maybe unflush segments. if (logSegments.isEmpty()) { + // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat)); } long logEndOffset = logSegments.lastSegment().get().readNextOffset(); @@ -294,6 +289,20 @@ private void loadSegmentFiles() throws IOException { long baseOffset = FlussPaths.offsetFromFile(file); LogSegment segment = LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat); + + try { + segment.sanityCheck(); + } catch (NoSuchFileException e) { + if (isCleanShutdown + || segment.getBaseOffset() < recoveryPointCheckpoint) { + LOG.error( + "Could not find offset index file corresponding to log file {} " + + "for bucket {}, recovering segment and rebuilding index files...", + logSegments.getTableBucket(), + segment.getFileLogRecords().file().getAbsoluteFile()); + } + recoverSegment(segment); + } logSegments.add(segment); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java index c45c25e5c9..5686b14669 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java @@ -180,7 +180,6 @@ public void sanityCheck() throws IOException { + lazyOffsetIndex.file().getAbsolutePath() + " does not exist."); } - lazyOffsetIndex.get().sanityCheck(); if (!lazyTimeIndex.file().exists()) { throw new NoSuchFileException( @@ -188,7 +187,10 @@ public void sanityCheck() throws IOException { + lazyTimeIndex.file().getAbsolutePath() + " does not exist."); } - lazyTimeIndex.get().sanityCheck(); + + // Sanity checks for time index and offset index are skipped because + // we will recover the segments above the recovery point in recoverLog() + // in any case so sanity checking them here is redundant. } /** diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java index 03d2ddaddd..2c4d525737 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java @@ -268,9 +268,6 @@ void testInvalidOffsetRebuild() throws Exception { corruptSegment.getFileLogRecords().append(memoryLogRecords); logTablet.close(); - // delete the index file to trigger the recovery - corruptSegment.offsetIndex().deleteIfExists(); - logTablet = createLogTablet(false); // the corrupt segment should be truncated to base offset assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset()); @@ -308,9 +305,6 @@ void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws Exception .sorted()) .containsExactly(2L, 4L); - // Delete all offset index files to trigger segment recover - deleteAllOffsetIndexFile(log); - log = createLogTablet(false); assertThat( WriterStateManager.listSnapshotFiles(logDir).stream() @@ -408,7 +402,7 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception { // after this step, the segments should be // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-] // writer id 1 2 1 2 1 2 - // se1 id 0 0 1 1 2 2 + // seq id 0 0 1 1 2 2 // snapshot 1 2 3 4 5 6 for (int i = 0; i <= 5; i++) { if (i % 2 == 0) { @@ -473,22 +467,19 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception { long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4); assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint); - // 1. Test unclean shut without any recovery + // 1. Test unclean shutdown // Retain snapshots for the last 2 segments (delete snapshots before that) long snapshotRetentionOffset = segmentOffsets.get(segmentOffsets.size() - 2); log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset); log.close(); - // Reopen the log with recovery point. Although we use unclean shutdown here, - // all the index files are correctly close, so Fluss will not trigger recover for any - // segment. + // Reopen the log with recovery point. Since we use unclean shutdown here, + // all the log segments after recovery point will be recovered. log = createLogTablet(false, recoveryPoint); - // Expected snapshot offsets: last 2 segment base offsets + log end offset - List lastTowSegmentOffsets = - segmentOffsets.subList( - Math.max(0, segmentOffsets.size() - 2), segmentOffsets.size()); - expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets); + // Expected snapshot offsets: segments base offset after recovery point + expectedSnapshotOffsets = + new HashSet<>(segmentOffsets.subList((int) recoveryPoint, segmentOffsets.size())); expectedSnapshotOffsets.add(log.localLogEndOffset()); // Verify that expected snapshot offsets exist @@ -509,71 +500,6 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception { expectedWritersLastBatchSequence.put(wid2, seq2 - 1); assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence); log.close(); - - // 2. Test unclean shut down without recover segment will rebuild writer state - // Delete all snapshot files - deleteAllWriterSnapshot(logDir); - - // Reopen the log with recovery point. Although Fluss will not trigger recover for any - // segment, but the writer state should be rebuilt. - log = createLogTablet(false, recoveryPoint); - - actualSnapshotOffsets = - WriterStateManager.listSnapshotFiles(logDir).stream() - .map(snapshotFile -> snapshotFile.offset) - .collect(Collectors.toSet()); - // Will rebuild writer state for all segments, but only take snapshot for the last 2 - // segments and the last offset - expectedSnapshotOffsets = new HashSet<>(); - expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 2)); - expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 1)); - expectedSnapshotOffsets.add(log.localLogEndOffset()); - assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets); - - // Verify that expected writers last batch sequence - actualWritersLastBatchSequence = - log.writerStateManager().activeWriters().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, e -> e.getValue().lastBatchSequence())); - expectedWritersLastBatchSequence = new HashMap<>(); - expectedWritersLastBatchSequence.put(wid1, seq1 - 1); - expectedWritersLastBatchSequence.put(wid2, seq2 - 1); - assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence); - log.close(); - - // 3. Test unclean shut down with recover segment will rebuild writer state for each segment - // after recovery point. - // Delete all snapshot files and index files (to trigger segment recover) - deleteAllWriterSnapshot(logDir); - deleteAllOffsetIndexFile(log); - - // Reopen the log with recovery point. All segments will be recovered since we delete all - // the index files - log = createLogTablet(false, recoveryPoint); - - // Writer snapshot files should be rebuilt for each segment after recovery point - actualSnapshotOffsets = - WriterStateManager.listSnapshotFiles(logDir).stream() - .map(snapshotFile -> snapshotFile.offset) - .collect(Collectors.toSet()); - expectedSnapshotOffsets = - log.logSegments(recoveryPoint, log.localLogEndOffset()).stream() - .map(LogSegment::getBaseOffset) - .collect(Collectors.toSet()); - expectedSnapshotOffsets.add(log.localLogEndOffset()); - assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets); - - actualWritersLastBatchSequence = - log.writerStateManager().activeWriters().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, e -> e.getValue().lastBatchSequence())); - expectedWritersLastBatchSequence = new HashMap<>(); - expectedWritersLastBatchSequence.put(wid1, seq1 - 1); - expectedWritersLastBatchSequence.put(wid2, seq2 - 1); - assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence); - log.close(); } @Test @@ -627,6 +553,89 @@ void testLoadingLogKeepsLargestStrayWriterStateSnapshot() throws Exception { .containsExactly(1L, 2L, 4L); } + /** + * Tests that log recovery is correctly triggered after an unclean shutdown (broker crash). + * + *

The test simulates an unclean shutdown by: + * + *

    + *
  1. Writing records to a log to create multiple segments + *
  2. Recording the logEndOffset before corruption + *
  3. Corrupting the index files (appending garbage bytes, simulating incomplete flush) + *
  4. Appending garbage bytes to the active .log file tail (simulating partial batch write) + *
  5. Reloading the log with {@code isCleanShutdown=false} + *
  6. Verifying that recovery truncates the corrupt tail, rebuilds indexes, and restores the + * correct logEndOffset + *
+ */ + @Test + void testLogRecoveryIsCalledUponBrokerCrash() throws Exception { + // Step 1: Create log and write enough records to produce multiple segments + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + + // Step 2: Record the expected logEndOffset after clean writes + long expectedLogEndOffset = logTablet.localLogEndOffset(); + assertThat(expectedLogEndOffset).isEqualTo(numRecords); + + // Step 3: Collect index files and the active segment's .log file before "crash" + List indexFiles = collectIndexFiles(logTablet.logSegments()); + File activeLogFile = logTablet.activeLogSegment().getFileLogRecords().file(); + + // Step 4: Close the log (simulating the on-disk state at the moment of crash) + logTablet.close(); + + // Step 5: Corrupt the index files (simulate incomplete index flush during crash) + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + // Append 12 bytes of garbage – enough to make the index size non-multiple of + // entrySize and trigger CorruptIndexException during sanityCheck + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {(byte) i})); + } + } + } + + // Step 6: Append garbage bytes to the active segment's .log file tail + // (simulate a partial/incomplete batch that was being written at crash time) + try (FileChannel logFileChannel = + FileChannel.open(activeLogFile.toPath(), StandardOpenOption.APPEND)) { + byte[] garbage = new byte[64]; + for (int i = 0; i < garbage.length; i++) { + garbage[i] = (byte) (i & 0xFF); + } + logFileChannel.write(ByteBuffer.wrap(garbage)); + } + + // Step 7: Reload with isCleanShutdown=false – this must trigger log recovery + logTablet = createLogTablet(false); + + // Step 8: Verify that recovery succeeded and the logEndOffset is restored correctly. + // Recovery should truncate the garbage bytes appended to the .log file and rebuild + // indexes, so the final offset must equal the number of valid records written. + assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset); + + // Step 9: Verify all index files pass sanity check after recovery + for (LogSegment segment : logTablet.logSegments()) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } + + // Step 10: Verify that all previously written records are still readable after recovery + for (int i = 0; i < numRecords; i++) { + assertThat(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10L)) + .isEqualTo(i); + } + + // Step 11: Verify that the log is still writable after recovery + appendRecords(logTablet, 1, (int) expectedLogEndOffset); + assertThat(logTablet.localLogEndOffset()).isEqualTo(expectedLogEndOffset + 1); + + logTablet.close(); + } + private LogTablet createLogTablet(boolean isCleanShutdown) throws Exception { return createLogTablet(isCleanShutdown, 0); } @@ -648,8 +657,13 @@ private LogTablet createLogTablet(boolean isCleanShutdown, long recoveryPoint) } private void appendRecords(LogTablet logTablet, int numRecords) throws Exception { - int baseOffset = 0; - int batchSequence = 0; + appendRecords(logTablet, numRecords, 0); + } + + private void appendRecords(LogTablet logTablet, int numRecords, int startOffset) + throws Exception { + int baseOffset = startOffset; + int batchSequence = startOffset; for (int i = 0; i < numRecords; i++) { List objects = Collections.singletonList(new Object[] {1, "a"}); List changeTypes = @@ -683,18 +697,4 @@ private List collectIndexFiles(List logSegments) throws IOExce } return indexFiles; } - - private void deleteAllOffsetIndexFile(LogTablet logTablet) throws IOException { - List logSegments = logTablet.logSegments(); - for (LogSegment segment : logSegments) { - segment.offsetIndex().deleteIfExists(); - } - } - - private void deleteAllWriterSnapshot(File logDir) throws Exception { - List snapshotFiles = WriterStateManager.listSnapshotFiles(logDir); - for (SnapshotFile snapshotFile : snapshotFiles) { - snapshotFile.deleteIfExists(); - } - } }