Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,38 @@ public static void waitForAllTasksRunning(final ExecutionGraph executionGraph)
RETRY_ATTEMPTS);
}

/**
* Waits until the task deployment descriptor of every current execution has been created. Call
* this before forcing executions into a later state; deployment is rejected once an execution
* leaves DEPLOYING.
*
* <p>Unlike {@code ExecutionUtils#waitForTaskDeploymentDescriptorsCreation}, this may run
* concurrently with {@code Execution.deploy()}: executions are observable in DEPLOYING before
* the descriptor future is assigned, so the {@code IllegalStateException} thrown while the
* future is unset is treated as "not yet ready".
*
* @param executionGraph the ExecutionGraph to wait for
* @throws Exception if the condition is not met within the timeout period
*/
public static void waitForAllTasksDeploymentDescriptorsCreated(
final ExecutionGraph executionGraph) throws Exception {
for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
waitUntilCondition(
() -> {
try {
vertex.getCurrentExecutionAttempt()
.getTddCreationDuringDeployFuture()
.join();
return true;
} catch (IllegalStateException deploymentNotStartedYet) {
return false;
}
},
RETRY_INTERVAL_MILLIS,
RETRY_ATTEMPTS);
}
}

private static ExecutionJobVertex getJobVertex(
DefaultScheduler scheduler, JobVertexID jobVertexId) {
final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksDeploymentDescriptorsCreated;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForAllTasksRunning;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
Expand Down Expand Up @@ -103,18 +104,23 @@ void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception {

// Transition job and all subtasks to RUNNING state.
waitForJobStatusRunning(scheduler);
runInMainThread(() -> setAllExecutionsToRunning(scheduler));
// Wait for all task executions to actually reach RUNNING state before triggering
// checkpoint.
// In slower CI environments, state transitions may not complete immediately, causing
// checkpoint triggers to be rejected if tasks are still in DEPLOYING/INITIALIZING state.
final ExecutionGraph executionGraph =
scheduler
.getState()
.as(StateWithExecutionGraph.class)
.map(StateWithExecutionGraph::getExecutionGraph)
.orElseThrow(
() -> new IllegalStateException("ExecutionGraph not available"));

// Forcing RUNNING while the asynchronous descriptor creation is still in flight fails the
// deployment and restarts the job, so the checkpoint below would never register.
waitForAllTasksDeploymentDescriptorsCreated(executionGraph);

runInMainThread(() -> setAllExecutionsToRunning(scheduler));
// Wait for all task executions to actually reach RUNNING state before triggering
// checkpoint.
// In slower CI environments, state transitions may not complete immediately, causing
// checkpoint triggers to be rejected if tasks are still in DEPLOYING/INITIALIZING state.
waitForAllTasksRunning(executionGraph);

// Trigger a checkpoint
Expand Down