From a7cc3e17ec7f2e7bc741ab007085614d01e4d793 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Fri, 12 Jun 2026 12:15:47 +0200 Subject: [PATCH] [FLINK-38534][runtime/tests] Fix LocalRecoveryTest racing the async task deployment The test forces executions to RUNNING while the scheduler is still creating task deployment descriptors; deployment is then rejected, the job restarts, and the manual checkpoint never registers. Wait for descriptor creation before forcing RUNNING, keyed off the descriptor future because executions are observable in DEPLOYING before the future is assigned. The earlier FLINK-38534 fix waited for RUNNING after the forced transition and misses this race. Generated-by: Claude Opus 4.8 (1M context) --- .../scheduler/SchedulerTestingUtils.java | 32 +++++++++++++++++++ .../scheduler/adaptive/LocalRecoveryTest.java | 16 +++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index 49a81d6fd2de4..1257f5a128592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -362,6 +362,38 @@ public static void waitForAllTasksRunning(final ExecutionGraph executionGraph) RETRY_ATTEMPTS); } + /** + * Waits until the task deployment descriptor of every current execution has been created. Call + * this before forcing executions into a later state; deployment is rejected once an execution + * leaves DEPLOYING. + * + *

Unlike {@code ExecutionUtils#waitForTaskDeploymentDescriptorsCreation}, this may run + * concurrently with {@code Execution.deploy()}: executions are observable in DEPLOYING before + * the descriptor future is assigned, so the {@code IllegalStateException} thrown while the + * future is unset is treated as "not yet ready". + * + * @param executionGraph the ExecutionGraph to wait for + * @throws Exception if the condition is not met within the timeout period + */ + public static void waitForAllTasksDeploymentDescriptorsCreated( + final ExecutionGraph executionGraph) throws Exception { + for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { + waitUntilCondition( + () -> { + try { + vertex.getCurrentExecutionAttempt() + .getTddCreationDuringDeployFuture() + .join(); + return true; + } catch (IllegalStateException deploymentNotStartedYet) { + return false; + } + }, + RETRY_INTERVAL_MILLIS, + RETRY_ATTEMPTS); + } + } + private static ExecutionJobVertex getJobVertex( DefaultScheduler scheduler, JobVertexID jobVertexId) { final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java index bdaf69c3f31af..4227afa3bf32a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java @@ -56,6 +56,7 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksDeploymentDescriptorsCreated; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksRunning; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint; @@ -103,11 +104,6 @@ void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { // Transition job and all subtasks to RUNNING state. waitForJobStatusRunning(scheduler); - runInMainThread(() -> setAllExecutionsToRunning(scheduler)); - // Wait for all task executions to actually reach RUNNING state before triggering - // checkpoint. - // In slower CI environments, state transitions may not complete immediately, causing - // checkpoint triggers to be rejected if tasks are still in DEPLOYING/INITIALIZING state. final ExecutionGraph executionGraph = scheduler .getState() @@ -115,6 +111,16 @@ void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { .map(StateWithExecutionGraph::getExecutionGraph) .orElseThrow( () -> new IllegalStateException("ExecutionGraph not available")); + + // Forcing RUNNING while the asynchronous descriptor creation is still in flight fails the + // deployment and restarts the job, so the checkpoint below would never register. + waitForAllTasksDeploymentDescriptorsCreated(executionGraph); + + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + // Wait for all task executions to actually reach RUNNING state before triggering + // checkpoint. + // In slower CI environments, state transitions may not complete immediately, causing + // checkpoint triggers to be rejected if tasks are still in DEPLOYING/INITIALIZING state. waitForAllTasksRunning(executionGraph); // Trigger a checkpoint