diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java index 77cd5ac35a8b1..ac49ea1d3e29b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java @@ -95,14 +95,12 @@ private static BlobServer createTestInstance( } @Test - void testTransientBlobNoJobCleanup() - throws IOException, InterruptedException, ExecutionException { + void testTransientBlobNoJobCleanup() throws Exception { testTransientBlobCleanup(null); } @Test - void testTransientBlobForJobCleanup() - throws IOException, InterruptedException, ExecutionException { + void testTransientBlobForJobCleanup() throws Exception { testTransientBlobCleanup(new JobID()); } @@ -110,8 +108,7 @@ void testTransientBlobForJobCleanup() * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are * constantly accessed. */ - private void testTransientBlobCleanup(@Nullable final JobID jobId) - throws IOException, InterruptedException, ExecutionException { + private void testTransientBlobCleanup(@Nullable final JobID jobId) throws Exception { // 1s should be a safe-enough buffer to still check for existence after a BLOB's last access long cleanupInterval = 1L; // in seconds @@ -150,9 +147,11 @@ private void testTransientBlobCleanup(@Nullable final JobID jobId) final JobID jobIdHA = (jobId == null) ? new JobID() : jobId; final BlobKey keyHA = put(server, jobIdHA, data, PERMANENT_BLOB); - // access key1, verify expiry times (delay at least 1ms to also verify key2 expiry is - // unchanged) - Thread.sleep(1); + // access key1, verify expiry times (wait until wall-clock advances so we can assert + // key2 expiry is unchanged even on slow or contended CI hosts) + final long afterKey2PutMillis = System.currentTimeMillis(); + CommonTestUtils.waitUntilCondition( + () -> System.currentTimeMillis() > afterKey2PutMillis, 1L); cleanupLowerBound = System.currentTimeMillis() + cleanupInterval; verifyContents(server, jobId, key1, data); final Long key1ExpiryAfterGet = transientBlobExpiryTimes.get(Tuple2.of(jobId, key1)); @@ -161,9 +160,11 @@ private void testTransientBlobCleanup(@Nullable final JobID jobId) assertThat(key2ExpiryAfterPut) .isEqualTo(transientBlobExpiryTimes.get(Tuple2.of(jobId, key2))); - // access key2, verify expiry times (delay at least 1ms to also verify key1 expiry is - // unchanged) - Thread.sleep(1); + // access key2, verify expiry times (wait until wall-clock advances so we can assert + // key1 expiry is unchanged even on slow or contended CI hosts) + final long afterKey1AccessMillis = System.currentTimeMillis(); + CommonTestUtils.waitUntilCondition( + () -> System.currentTimeMillis() > afterKey1AccessMillis, 1L); cleanupLowerBound = System.currentTimeMillis() + cleanupInterval; verifyContents(server, jobId, key2, data2); assertThat(key1ExpiryAfterGet)