Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,25 @@ public void setup() throws IOException, RemoteStorageException {
}

@AfterEach
public void cleanup() throws InterruptedException {
public void cleanup() throws Exception {
reset(rsm);
// the files created for the test will be deleted automatically on thread exit since we use temp dir
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
// best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory
// will be deleted at the end of test.
// Close the cache directly instead of using closeQuietly to ensure any exception during close
// (e.g. failure to shut down the cleaner scheduler) is properly propagated rather than silently swallowed,
// which could leave the cleaner thread alive and cause the leaked thread assertion below to fail.
try {
Utils.delete(logDir);
} catch (IOException ioe) {
// ignore
cache.close();
} finally {
// best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory
// will be deleted at the end of test.
try {
Utils.delete(logDir);
} catch (IOException ioe) {
// ignore
}
// Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach
// because this may throw an exception and prevent cleanup after it
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true);
}
// Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach
// because this may throw an exception and prevent cleanup after it
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true);
}

@Test
Expand Down Expand Up @@ -1044,6 +1049,11 @@ public void testConcurrentCacheDeletedFileExists() throws InterruptedException,
"Time index file should not be present on disk at " + remoteIndexCacheDir.toPath());
TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
"Index file marked for deletion should not be present on disk at " + remoteIndexCacheDir.toPath());

// Wait for all scheduled cleanup tasks to complete so the cleaner thread is idle before @AfterEach
// shuts it down. This prevents a race between an in-flight task and scheduler shutdown.
TestUtils.waitForCondition(() -> cache.expiredIdxPendingForDeletion() == 0,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼

"Expired index entries should have been cleaned up");
}

@Test
Expand All @@ -1065,11 +1075,12 @@ public void testDeleteInvalidIndexFilesOnInit() throws IOException {
OffsetIndex validOffsetIdx = createOffsetIndexForSegmentMetadata(rlsMetadata, logDir);
TransactionIndex validTimeIdx = createTxIndexForSegmentMetadata(rlsMetadata, logDir);

new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString());
assertFalse(invalidOffsetIdxFile.exists());
assertFalse(invalidTimeIndexFile.exists());
assertTrue(validOffsetIdx.file().exists());
assertTrue(validTimeIdx.file().exists());
try (RemoteIndexCache secondCache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString())) {
assertFalse(invalidOffsetIdxFile.exists());
assertFalse(invalidTimeIndexFile.exists());
assertTrue(validOffsetIdx.file().exists());
assertTrue(validTimeIdx.file().exists());
}
}

@Test
Expand Down