Skip to content

Commit bb223ea

Browse files
committed
[server] Fix TabletServer fails to restart after unclean shutdown due to EOFException in log recovery
1 parent d4748f2 commit bb223ea

3 files changed

Lines changed: 52 additions & 122 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
2424
import org.apache.fluss.exception.LogStorageException;
2525
import org.apache.fluss.metadata.LogFormat;
26-
import org.apache.fluss.server.exception.CorruptIndexException;
2726
import org.apache.fluss.utils.FlussPaths;
2827
import org.apache.fluss.utils.types.Tuple2;
2928

@@ -137,6 +136,7 @@ public LoadedLogOffsets load() throws IOException {
137136
*/
138137
private Tuple2<Long, Long> recoverLog() throws IOException {
139138
if (!isCleanShutdown) {
139+
long recoverLogStart = System.currentTimeMillis();
140140
List<LogSegment> unflushed =
141141
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
142142
int numUnflushed = unflushed.size();
@@ -153,46 +153,41 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
153153
numUnflushed,
154154
logSegments.getTableBucket());
155155

156+
int truncatedBytes = -1;
156157
try {
157-
segment.sanityCheck();
158-
} catch (NoSuchFileException | CorruptIndexException e) {
158+
truncatedBytes = recoverSegment(segment);
159+
} catch (InvalidOffsetException e) {
160+
long startOffset = segment.getBaseOffset();
159161
LOG.warn(
160-
"Found invalid index file corresponding log file {} for bucket {}, "
161-
+ "recovering segment and rebuilding index files...",
162-
segment.getFileLogRecords().file().getAbsoluteFile(),
162+
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
163+
+ "and creating an empty one with starting offset {}",
163164
logSegments.getTableBucket(),
164-
e);
165-
166-
int truncatedBytes = -1;
167-
try {
168-
truncatedBytes = recoverSegment(segment);
169-
} catch (InvalidOffsetException invalidOffsetException) {
170-
long startOffset = segment.getBaseOffset();
171-
LOG.warn(
172-
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
173-
+ "and creating an empty one with starting offset {}",
174-
logSegments.getTableBucket(),
175-
startOffset);
176-
truncatedBytes = segment.truncateTo(startOffset);
177-
}
165+
startOffset);
166+
truncatedBytes = segment.truncateTo(startOffset);
167+
}
178168

179-
if (truncatedBytes > 0) {
180-
// we had an invalid message, delete all remaining log
181-
LOG.warn(
182-
"Corruption found in segment {} for bucket {}, truncating to offset {}",
183-
segment.getBaseOffset(),
184-
logSegments.getTableBucket(),
185-
segment.readNextOffset());
186-
removeAndDeleteSegments(unflushedIter);
187-
truncated = true;
188-
}
169+
if (truncatedBytes > 0) {
170+
// we had an invalid message, delete all remaining log
171+
LOG.warn(
172+
"Corruption found in segment {} for bucket {}, truncating to offset {}",
173+
segment.getBaseOffset(),
174+
logSegments.getTableBucket(),
175+
segment.readNextOffset());
176+
removeAndDeleteSegments(unflushedIter);
177+
truncated = true;
178+
} else {
179+
numFlushed += 1;
189180
}
190-
numFlushed += 1;
191181
}
182+
long recoverLogEnd = System.currentTimeMillis();
183+
LOG.info(
184+
"Log recovery completed for bucket {} in {} ms",
185+
logSegments.getTableBucket(),
186+
recoverLogEnd - recoverLogStart);
192187
}
193188

194-
// TODO truncate log to recover maybe unflush segments.
195189
if (logSegments.isEmpty()) {
190+
// TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
196191
logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat));
197192
}
198193
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
@@ -294,6 +289,20 @@ private void loadSegmentFiles() throws IOException {
294289
long baseOffset = FlussPaths.offsetFromFile(file);
295290
LogSegment segment =
296291
LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat);
292+
293+
try {
294+
segment.sanityCheck();
295+
} catch (NoSuchFileException e) {
296+
if (isCleanShutdown
297+
|| segment.getBaseOffset() < recoveryPointCheckpoint) {
298+
LOG.error(
299+
"Could not find offset index file corresponding to log file {} "
300+
+ "for bucket {}, recovering segment and rebuilding index files...",
301+
logSegments.getTableBucket(),
302+
segment.getFileLogRecords().file().getAbsoluteFile());
303+
}
304+
recoverSegment(segment);
305+
}
297306
logSegments.add(segment);
298307
}
299308
}

fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,17 @@ public void sanityCheck() throws IOException {
180180
+ lazyOffsetIndex.file().getAbsolutePath()
181181
+ " does not exist.");
182182
}
183-
lazyOffsetIndex.get().sanityCheck();
184183

185184
if (!lazyTimeIndex.file().exists()) {
186185
throw new NoSuchFileException(
187186
"Time index file "
188187
+ lazyTimeIndex.file().getAbsolutePath()
189188
+ " does not exist.");
190189
}
191-
lazyTimeIndex.get().sanityCheck();
190+
191+
// Sanity checks for time index and offset index are skipped because
192+
// we will recover the segments above the recovery point in recoverLog()
193+
// in any case so sanity checking them here is redundant.
192194
}
193195

194196
/**

fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java

Lines changed: 7 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,6 @@ void testInvalidOffsetRebuild() throws Exception {
268268
corruptSegment.getFileLogRecords().append(memoryLogRecords);
269269
logTablet.close();
270270

271-
// delete the index file to trigger the recovery
272-
corruptSegment.offsetIndex().deleteIfExists();
273-
274271
logTablet = createLogTablet(false);
275272
// the corrupt segment should be truncated to base offset
276273
assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset());
@@ -308,9 +305,6 @@ void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws Exception
308305
.sorted())
309306
.containsExactly(2L, 4L);
310307

311-
// Delete all offset index files to trigger segment recover
312-
deleteAllOffsetIndexFile(log);
313-
314308
log = createLogTablet(false);
315309
assertThat(
316310
WriterStateManager.listSnapshotFiles(logDir).stream()
@@ -408,7 +402,7 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
408402
// after this step, the segments should be
409403
// [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
410404
// writer id 1 2 1 2 1 2
411-
// se1 id 0 0 1 1 2 2
405+
// seq id 0 0 1 1 2 2
412406
// snapshot 1 2 3 4 5 6
413407
for (int i = 0; i <= 5; i++) {
414408
if (i % 2 == 0) {
@@ -473,22 +467,19 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
473467
long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4);
474468
assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint);
475469

476-
// 1. Test unclean shut without any recovery
470+
// 1. Test unclean shutdown
477471
// Retain snapshots for the last 2 segments (delete snapshots before that)
478472
long snapshotRetentionOffset = segmentOffsets.get(segmentOffsets.size() - 2);
479473
log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset);
480474
log.close();
481475

482-
// Reopen the log with recovery point. Although we use unclean shutdown here,
483-
// all the index files are correctly close, so Fluss will not trigger recover for any
484-
// segment.
476+
// Reopen the log with recovery point. Since we use unclean shutdown here,
477+
// all the log segments after recovery point will be recovered.
485478
log = createLogTablet(false, recoveryPoint);
486479

487-
// Expected snapshot offsets: last 2 segment base offsets + log end offset
488-
List<Long> lastTowSegmentOffsets =
489-
segmentOffsets.subList(
490-
Math.max(0, segmentOffsets.size() - 2), segmentOffsets.size());
491-
expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets);
480+
// Expected snapshot offsets: segments base offset after recovery point
481+
expectedSnapshotOffsets =
482+
new HashSet<>(segmentOffsets.subList((int) recoveryPoint, segmentOffsets.size()));
492483
expectedSnapshotOffsets.add(log.localLogEndOffset());
493484

494485
// Verify that expected snapshot offsets exist
@@ -509,71 +500,6 @@ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
509500
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
510501
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
511502
log.close();
512-
513-
// 2. Test unclean shut down without recover segment will rebuild writer state
514-
// Delete all snapshot files
515-
deleteAllWriterSnapshot(logDir);
516-
517-
// Reopen the log with recovery point. Although Fluss will not trigger recover for any
518-
// segment, but the writer state should be rebuilt.
519-
log = createLogTablet(false, recoveryPoint);
520-
521-
actualSnapshotOffsets =
522-
WriterStateManager.listSnapshotFiles(logDir).stream()
523-
.map(snapshotFile -> snapshotFile.offset)
524-
.collect(Collectors.toSet());
525-
// Will rebuild writer state for all segments, but only take snapshot for the last 2
526-
// segments and the last offset
527-
expectedSnapshotOffsets = new HashSet<>();
528-
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 2));
529-
expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 1));
530-
expectedSnapshotOffsets.add(log.localLogEndOffset());
531-
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
532-
533-
// Verify that expected writers last batch sequence
534-
actualWritersLastBatchSequence =
535-
log.writerStateManager().activeWriters().entrySet().stream()
536-
.collect(
537-
Collectors.toMap(
538-
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
539-
expectedWritersLastBatchSequence = new HashMap<>();
540-
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
541-
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
542-
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
543-
log.close();
544-
545-
// 3. Test unclean shut down with recover segment will rebuild writer state for each segment
546-
// after recovery point.
547-
// Delete all snapshot files and index files (to trigger segment recover)
548-
deleteAllWriterSnapshot(logDir);
549-
deleteAllOffsetIndexFile(log);
550-
551-
// Reopen the log with recovery point. All segments will be recovered since we delete all
552-
// the index files
553-
log = createLogTablet(false, recoveryPoint);
554-
555-
// Writer snapshot files should be rebuilt for each segment after recovery point
556-
actualSnapshotOffsets =
557-
WriterStateManager.listSnapshotFiles(logDir).stream()
558-
.map(snapshotFile -> snapshotFile.offset)
559-
.collect(Collectors.toSet());
560-
expectedSnapshotOffsets =
561-
log.logSegments(recoveryPoint, log.localLogEndOffset()).stream()
562-
.map(LogSegment::getBaseOffset)
563-
.collect(Collectors.toSet());
564-
expectedSnapshotOffsets.add(log.localLogEndOffset());
565-
assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
566-
567-
actualWritersLastBatchSequence =
568-
log.writerStateManager().activeWriters().entrySet().stream()
569-
.collect(
570-
Collectors.toMap(
571-
Map.Entry::getKey, e -> e.getValue().lastBatchSequence()));
572-
expectedWritersLastBatchSequence = new HashMap<>();
573-
expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
574-
expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
575-
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
576-
log.close();
577503
}
578504

579505
@Test
@@ -690,11 +616,4 @@ private void deleteAllOffsetIndexFile(LogTablet logTablet) throws IOException {
690616
segment.offsetIndex().deleteIfExists();
691617
}
692618
}
693-
694-
private void deleteAllWriterSnapshot(File logDir) throws Exception {
695-
List<SnapshotFile> snapshotFiles = WriterStateManager.listSnapshotFiles(logDir);
696-
for (SnapshotFile snapshotFile : snapshotFiles) {
697-
snapshotFile.deleteIfExists();
698-
}
699-
}
700619
}

0 commit comments

Comments
 (0)