diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java index dc9c07839c96b..78c241df618fd 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java @@ -127,20 +127,25 @@ public void setup() throws IOException, RemoteStorageException { } @AfterEach - public void cleanup() throws InterruptedException { + public void cleanup() throws Exception { reset(rsm); - // the files created for the test will be deleted automatically on thread exit since we use temp dir - Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); - // best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory - // will be deleted at the end of test. + // Close the cache directly instead of using closeQuietly to ensure any exception during close + // (e.g. failure to shut down the cleaner scheduler) is properly propagated rather than silently swallowed, + // which could leave the cleaner thread alive and cause the leaked thread assertion below to fail. try { - Utils.delete(logDir); - } catch (IOException ioe) { - // ignore + cache.close(); + } finally { + // best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory + // will be deleted at the end of test. + try { + Utils.delete(logDir); + } catch (IOException ioe) { + // ignore + } + // Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach + // because this may throw an exception and prevent cleanup after it + TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true); } - // Verify no lingering threads. It is important to have this as the very last statement in the @AfterEach - // because this may throw an exception and prevent cleanup after it - TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, true); } @Test @@ -1044,6 +1049,11 @@ public void testConcurrentCacheDeletedFileExists() throws InterruptedException, "Time index file should not be present on disk at " + remoteIndexCacheDir.toPath()); TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), "Index file marked for deletion should not be present on disk at " + remoteIndexCacheDir.toPath()); + + // Wait for all scheduled cleanup tasks to complete so the cleaner thread is idle before @AfterEach + // shuts it down. This prevents a race between an in-flight task and scheduler shutdown. + TestUtils.waitForCondition(() -> cache.expiredIdxPendingForDeletion() == 0, + "Expired index entries should have been cleaned up"); } @Test @@ -1065,11 +1075,12 @@ public void testDeleteInvalidIndexFilesOnInit() throws IOException { OffsetIndex validOffsetIdx = createOffsetIndexForSegmentMetadata(rlsMetadata, logDir); TransactionIndex validTimeIdx = createTxIndexForSegmentMetadata(rlsMetadata, logDir); - new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString()); - assertFalse(invalidOffsetIdxFile.exists()); - assertFalse(invalidTimeIndexFile.exists()); - assertTrue(validOffsetIdx.file().exists()); - assertTrue(validTimeIdx.file().exists()); + try (RemoteIndexCache secondCache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString())) { + assertFalse(invalidOffsetIdxFile.exists()); + assertFalse(invalidTimeIndexFile.exists()); + assertTrue(validOffsetIdx.file().exists()); + assertTrue(validTimeIdx.file().exists()); + } } @Test