From 046bade5bbf650620a84b7e2bff89da5e8ab779c Mon Sep 17 00:00:00 2001 From: Yi Zhang Date: Wed, 18 Mar 2026 15:49:44 +0800 Subject: [PATCH 1/2] [FLINK-39309][runtime] Check terminated applications for duplication --- .../clusterframework/ApplicationStatus.java | 25 ++ .../flink/runtime/dispatcher/Dispatcher.java | 273 +++++++++++------- ...tractThreadsafeApplicationResultStore.java | 10 + .../ApplicationResultStore.java | 17 ++ .../EmbeddedApplicationResultStore.java | 10 + .../FileSystemApplicationResultStore.java | 17 ++ .../ApplicationStatusTest.java | 29 ++ .../dispatcher/DispatcherApplicationTest.java | 117 ++++++++ .../ApplicationResultStoreContractTest.java | 50 ++++ ...temApplicationResultStoreContractTest.java | 23 ++ .../TestingApplicationResultStore.java | 29 +- 11 files changed, 499 insertions(+), 101 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java index f50374712747a..94707eb6c7e64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobStatus; import org.apache.flink.shaded.guava33.com.google.common.collect.BiMap; @@ -89,4 +90,28 @@ public JobStatus deriveJobStatus() { return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(this); } + + /** + * Derives the ApplicationStatus that corresponds to the given ApplicationState. If the + * ApplicationState is not a terminal state, this method returns {@link #UNKNOWN}. + * + *

Note: {@code ApplicationState} covers the entire lifecycle of an application, representing + * various stages from created to finish. {@code ApplicationStatus}, on the other hand, + * describes the final state of the cluster when shutdown. + * + * @param applicationState the application state + * @return the corresponding status + */ + public static ApplicationStatus fromApplicationState(ApplicationState applicationState) { + switch (applicationState) { + case FAILED: + return FAILED; + case CANCELED: + return CANCELED; + case FINISHED: + return SUCCEEDED; + default: + return UNKNOWN; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index d5394704e327e..1c6c8d5bc163b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.WebOptions; @@ -483,52 +484,14 @@ public void onStart() throws Exception { this.getRpcService().getScheduledExecutor(), this::onFatalError); + startApplicationsCleanup(); + if (dispatcherBootstrap instanceof ApplicationBootstrap) { // Application Mode - checkState(suspendedApplications.isEmpty()); - checkState(recoveredDirtyApplicationResults.size() <= 1); - - AbstractApplication application = - ((ApplicationBootstrap) dispatcherBootstrap).getApplication(); - if (!recoveredDirtyApplicationResults.isEmpty()) { - // the application is already terminated - ApplicationResult applicationResult = - recoveredDirtyApplicationResults.iterator().next(); - checkState( - application - .getApplicationId() - .equals(applicationResult.getApplicationId())); - - startApplicationCleanup(); - } else { - // defer starting recovered jobs, as they might be skipped based on user logic - internalSubmitApplication(application).get(); - } + maybeSubmitApplicationInApplicationMode(); } else { // Session Mode - startApplicationCleanup(); - - // start suspended applications - for (AbstractApplication suspendedApplication : suspendedApplications.values()) { - // defer starting recovered jobs, as they might be skipped based on user logic - internalSubmitApplication(suspendedApplication).get(); - } - - // start suspended jobs that do not belong to any application (previously submitted in a - // SingleJobApplication) by wrapping them into a SingleJobApplication - Iterator> jobIterator = - suspendedJobs.entrySet().iterator(); - while (jobIterator.hasNext()) { - Map.Entry entry = jobIterator.next(); - ExecutionPlan recoveredJob = entry.getValue(); - ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null); - - if (!suspendedApplications.containsKey(applicationId)) { - runRecoveredJob(recoveredJob, true); - jobIterator.remove(); - suspendedJobIdsByApplicationId.remove(applicationId); - } - } + recoverApplicationsAndJobsInSessionMode(); } checkState(recoveredDirtyJobResultsByApplicationId.isEmpty()); @@ -673,49 +636,145 @@ private void runJobWithCleanupRunner( } } - private void startApplicationCleanup() { + private void startApplicationsCleanup() { for (ApplicationResult applicationResult : recoveredDirtyApplicationResults) { - ApplicationID applicationId = applicationResult.getApplicationId(); - ApplicationState applicationState = applicationResult.getApplicationState(); - - Map jobs = new HashMap<>(); - Collection dirtyJobResults = - recoveredDirtyJobResultsByApplicationId.remove(applicationId); - if (dirtyJobResults != null) { - for (JobResult jobResult : dirtyJobResults) { - JobID jobId = jobResult.getJobId(); - ExecutionGraphInfo executionGraphInfo = - new ExecutionGraphInfo( - ArchivedExecutionGraph.createSparseArchivedExecutionGraph( - jobResult, -1)); - jobs.put(jobId, executionGraphInfo); - - runJobWithCleanupRunner(jobResult, false); - } + try { + startApplicationCleanup(applicationResult); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not start cleanup for application %s.", + applicationResult.getApplicationId()), + throwable)); } + } + } - long[] stateTimestamps = new long[ApplicationState.values().length]; - stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime(); - stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime(); + private void startApplicationCleanup(ApplicationResult applicationResult) { + ApplicationID applicationId = applicationResult.getApplicationId(); + ApplicationState applicationState = applicationResult.getApplicationState(); - ArchivedApplication sparseArchivedApplication = - new ArchivedApplication( - applicationId, - applicationResult.getApplicationName(), - applicationState, - stateTimestamps, - jobs, - Collections.emptyList()); + Map jobs = new HashMap<>(); + Collection dirtyJobResults = + recoveredDirtyJobResultsByApplicationId.remove(applicationId); + if (dirtyJobResults != null) { + for (JobResult jobResult : dirtyJobResults) { + JobID jobId = jobResult.getJobId(); + ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( + jobResult, -1)); + jobs.put(jobId, executionGraphInfo); + + runJobWithCleanupRunner(jobResult, false); + } + } - writeToArchivedApplicationStore(sparseArchivedApplication); + long[] stateTimestamps = new long[ApplicationState.values().length]; + stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime(); + stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime(); + + ArchivedApplication sparseArchivedApplication = + new ArchivedApplication( + applicationId, + applicationResult.getApplicationName(), + applicationState, + stateTimestamps, + jobs, + Collections.emptyList()); + + writeToArchivedApplicationStore(sparseArchivedApplication); + + // the dirty result already exists + // create a completed future to make sure the jobs can be marked clean + applicationCreateDirtyResultFutures.put(applicationId, FutureUtils.completedVoidFuture()); + applicationTerminationFutures.put(applicationId, new CompletableFuture<>()); - // the dirty result already exists - // create a completed future to make sure the jobs can be marked clean - applicationCreateDirtyResultFutures.put( - applicationId, FutureUtils.completedVoidFuture()); - applicationTerminationFutures.put(applicationId, new CompletableFuture<>()); + removeApplication(applicationId, jobs.keySet()); + } - removeApplication(applicationId, jobs.keySet()); + private void recoverApplicationsAndJobsInSessionMode() { + for (AbstractApplication suspendedApplication : suspendedApplications.values()) { + // defer starting recovered jobs, as they might be skipped based on user logic + try { + internalSubmitApplication(suspendedApplication).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not start recovered application %s.", + suspendedApplication.getApplicationId()), + throwable)); + } + } + + // start suspended jobs that do not belong to any application (previously submitted in a + // SingleJobApplication) by wrapping them into a SingleJobApplication + Iterator> jobIterator = suspendedJobs.entrySet().iterator(); + while (jobIterator.hasNext()) { + Map.Entry entry = jobIterator.next(); + ExecutionPlan recoveredJob = entry.getValue(); + ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null); + if (!suspendedApplications.containsKey(applicationId)) { + runRecoveredJob(recoveredJob, true); + jobIterator.remove(); + suspendedJobIdsByApplicationId.remove(applicationId); + } + } + } + + private void maybeSubmitApplicationInApplicationMode() { + checkState(suspendedApplications.isEmpty()); + checkState(recoveredDirtyApplicationResults.size() <= 1); + + AbstractApplication application = + ((ApplicationBootstrap) dispatcherBootstrap).getApplication(); + ApplicationID applicationId = application.getApplicationId(); + boolean shutDownOnApplicationFinish = + configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH); + if (!recoveredDirtyApplicationResults.isEmpty()) { + // the application is already terminated but needs to be cleaned up + ApplicationResult applicationResult = + recoveredDirtyApplicationResults.iterator().next(); + checkState(applicationId.equals(applicationResult.getApplicationId())); + + if (shutDownOnApplicationFinish) { + shutDownCluster( + ApplicationStatus.fromApplicationState( + applicationResult.getApplicationState())); + } + } else { + // check whether the application is already cleaned up + ApplicationResult applicationResult = null; + try { + applicationResult = + applicationResultStore.getCleanApplicationResultAsync(applicationId).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format( + "Could not get clean application result for application %s.", + applicationId), + throwable)); + } + + if (applicationResult == null) { + try { + internalSubmitApplication(application).get(); + } catch (Throwable throwable) { + onFatalError( + new DispatcherException( + String.format("Could not start application %s.", applicationId), + throwable)); + } + } else { + if (shutDownOnApplicationFinish) { + shutDownCluster( + ApplicationStatus.fromApplicationState( + applicationResult.getApplicationState())); + } + } } } @@ -873,29 +932,45 @@ public CompletableFuture submitApplication( final ApplicationID applicationId = application.getApplicationId(); log.info( "Received application submission '{}' ({}).", application.getName(), applicationId); + return applicationResultStore + .hasApplicationResultEntryAsync(applicationId) + .thenComposeAsync( + isTerminated -> { + if (isTerminated) { + log.warn( + "Ignoring application submission '{}' ({}) because the application already " + + "reached a terminal state.", + application.getName(), + applicationId); + return FutureUtils.completedExceptionally( + new DuplicateApplicationSubmissionException(applicationId)); + } else if (applications.containsKey(applicationId) + || archivedApplicationStore.get(applicationId).isPresent()) { + log.warn("Application with id {} already exists.", applicationId); + return FutureUtils.completedExceptionally( + new DuplicateApplicationSubmissionException(applicationId)); + } - if (applications.containsKey(applicationId)) { - log.warn("Application with id {} already exists.", applicationId); - throw new CompletionException( - new DuplicateApplicationSubmissionException(applicationId)); - } - - Optional optionalApplicationStoreEntry = - application.getApplicationStoreEntry(); - if (optionalApplicationStoreEntry.isPresent()) { - try { - applicationWriter.putApplication(optionalApplicationStoreEntry.get()); - } catch (Exception e) { - String msg = - String.format( - "Could not persist application %s to the ApplicationStore.", - applicationId); - log.warn(msg); - throw new CompletionException(new RuntimeException(msg, e)); - } - } + Optional optionalApplicationStoreEntry = + application.getApplicationStoreEntry(); + if (optionalApplicationStoreEntry.isPresent()) { + try { + applicationWriter.putApplication( + optionalApplicationStoreEntry.get()); + } catch (Exception e) { + String msg = + String.format( + "Could not persist application %s to the ApplicationStore.", + applicationId); + log.warn(msg, e); + return FutureUtils.completedExceptionally( + new RuntimeException(msg, e)); + } + } - return internalSubmitApplication(application); + return internalSubmitApplication(application); + }, + getMainThreadExecutor()); } /** This method must be called from the main thread. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java index 5fbdacff80707..4f539dd7c1608 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java @@ -128,6 +128,16 @@ public Set getDirtyResults() throws IOException { @GuardedBy("readWriteLock") protected abstract Set getDirtyResultsInternal() throws IOException; + @Override + public CompletableFuture getCleanApplicationResultAsync( + ApplicationID applicationId) { + return withReadLockAsync(() -> getCleanApplicationResultInternal(applicationId)); + } + + @GuardedBy("readWriteLock") + protected abstract ApplicationResult getCleanApplicationResultInternal( + ApplicationID applicationId) throws IOException; + private CompletableFuture withWriteLockAsync(ThrowingRunnable runnable) { return FutureUtils.runAsync( () -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java index 5606e8e8169e9..8e48b84d0e2c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java @@ -109,4 +109,21 @@ default CompletableFuture hasApplicationResultEntryAsync(ApplicationID * @throws IOException if collecting the set of dirty results failed for IO reasons. */ Set getDirtyResults() throws IOException; + + /** + * Asynchronously gets the persisted {@link ApplicationResult} instance that is marked as {@code + * clean} for the given {@code ApplicationID}. + * + *

This method is used to determine whether the application has already completed and been + * cleaned up, thereby avoiding duplicate execution. Note that it only works when {@link + * ApplicationResultStoreOptions#DELETE_ON_COMMIT} is set to {@code false}; otherwise clean + * entries are deleted upon commit and cannot be retrieved. + * + * @param applicationId Ident of the application we wish to get. + * @return a {@link CompletableFuture} that completes with the {@link ApplicationResult} if a + * clean entry exists for the given {@code applicationId}; otherwise a successfully + * completed future with {@code null}. + */ + CompletableFuture getCleanApplicationResultAsync( + ApplicationID applicationId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java index af987d6dba3aa..3ac50ebb3523e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java @@ -75,6 +75,16 @@ protected Set getDirtyResultsInternal() { .collect(Collectors.toSet()); } + @Override + protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) { + final ApplicationResultEntry entry = cleanResults.get(applicationId); + if (entry == null) { + return null; + } + + return entry.getApplicationResult(); + } + /** Clears all stored results. */ public void clear() { dirtyResults.clear(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java index fd938fede9b15..b4832110fad5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java @@ -230,6 +230,23 @@ public Set getDirtyResultsInternal() throws IOException { return dirtyResults; } + @Override + protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) + throws IOException { + if (deleteOnCommit) { + return null; + } + + Path cleanPath = constructCleanPath(applicationId); + if (!fileSystem.exists(cleanPath)) { + return null; + } + + JsonApplicationResultEntry jre = + mapper.readValue(fileSystem.open(cleanPath), JsonApplicationResultEntry.class); + return jre.getApplicationResult(); + } + /** * Wrapper class around {@link ApplicationResultEntry} to allow for serialization of a schema * version, so that future schema changes can be handled in a backwards compatible manner. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java index 507c0a4fe2675..7e4620b67a7bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobStatus; import org.junit.jupiter.api.Test; @@ -115,6 +116,34 @@ void testUnknownApplicationStatusForMissingJobStatus() { assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN); } + @Test + void testFailedApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FAILED)) + .isEqualTo(ApplicationStatus.FAILED); + } + + @Test + void testCancelledApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.CANCELED)) + .isEqualTo(ApplicationStatus.CANCELED); + } + + @Test + void testSucceededApplicationStatusFromApplicationState() { + assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FINISHED)) + .isEqualTo(ApplicationStatus.SUCCEEDED); + } + + @ParameterizedTest + @EnumSource( + value = ApplicationState.class, + names = {"CREATED", "RUNNING", "FAILING", "CANCELING"}) + public void testUnknownApplicationStatusFromNonTerminalApplicationState( + ApplicationState applicationState) { + assertThat(ApplicationStatus.fromApplicationState(applicationState)) + .isEqualTo(ApplicationStatus.UNKNOWN); + } + private static Iterable exitCodes(Iterable statuses) { return StreamSupport.stream(statuses.spliterator(), false) .map(ApplicationStatus::processExitCode) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java index 16c8d76d0720e..dfcd7065feb1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.application.AbstractApplication; @@ -736,6 +737,122 @@ public void testDuplicateSubmissionWithRecoveredApplication() throws Exception { .hasCauseInstanceOf(DuplicateApplicationSubmissionException.class); } + @Test + public void testDuplicateSubmissionWithTerminatedButDirtyApplication() throws Exception { + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + + assertDuplicateApplicationSubmission(); + } + + @Test + public void testDuplicateSubmissionWithTerminatedAndCleanedApplication() throws Exception { + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get(); + + assertDuplicateApplicationSubmission(); + } + + private void assertDuplicateApplicationSubmission() throws Exception { + dispatcher = createTestingDispatcherBuilder().build(rpcService); + dispatcher.start(); + + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + final AbstractApplication application = + TestingApplication.builder().setApplicationId(applicationId).build(); + + final CompletableFuture submitFuture = + dispatcherGateway.submitApplication(application, TIMEOUT); + + assertThatThrownBy(submitFuture::get) + .hasCauseInstanceOf(DuplicateApplicationSubmissionException.class); + } + + @Test + public void testApplicationBootstrapWithDirtyResultTriggersShutdown() throws Exception { + testApplicationBootstrapWithApplicationResult(false, true); + } + + @Test + public void testApplicationBootstrapWithDirtyResultDoesNotTriggerShutdownWhenDisabled() + throws Exception { + testApplicationBootstrapWithApplicationResult(false, false); + } + + @Test + public void testApplicationBootstrapWithCleanResultTriggersShutdown() throws Exception { + testApplicationBootstrapWithApplicationResult(true, true); + } + + @Test + public void testApplicationBootstrapWithCleanResultDoesNotTriggerShutdownWhenDisabled() + throws Exception { + testApplicationBootstrapWithApplicationResult(true, false); + } + + private void testApplicationBootstrapWithApplicationResult( + boolean isCleanResult, boolean triggerShutDown) throws Exception { + configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, triggerShutDown); + + final ApplicationResult applicationResult = + TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId); + haServices + .getApplicationResultStore() + .createDirtyResultAsync(new ApplicationResultEntry(applicationResult)) + .get(); + if (isCleanResult) { + haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get(); + } + + final OneShotLatch bootstrapLatch = new OneShotLatch(); + final TestingDispatcher.Builder builder = + createTestingDispatcherBuilder() + .setDispatcherBootstrapFactory( + (ignoredDispatcherGateway, + ignoredScheduledExecutor, + ignoredFatalErrorHandler) -> + new ApplicationBootstrap( + TestingApplication.builder() + .setApplicationId(applicationId) + .setExecuteFunction( + ignoredExecuteParams -> { + bootstrapLatch.trigger(); + return CompletableFuture + .completedFuture( + Acknowledge + .get()); + }) + .build())); + if (!isCleanResult) { + builder.setRecoveredDirtyApplications(Collections.singleton(applicationResult)); + } + dispatcher = builder.build(rpcService); + dispatcher.start(); + + if (triggerShutDown) { + assertEquals( + ApplicationStatus.SUCCEEDED, + dispatcher.getShutDownFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)); + } else { + assertThatThrownBy( + () -> dispatcher.getShutDownFuture().get(100L, TimeUnit.MILLISECONDS)) + .isInstanceOf(TimeoutException.class); + } + + assertFalse(bootstrapLatch.isTriggered()); + } + @Test public void testThatDirtilyFinishedApplicationsNotRetriggered() { final AbstractApplication application = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java index 803f69c792073..51aa19be5c323 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java @@ -208,4 +208,54 @@ default void testGetDirtyResultsWithDirtyAndCleanEntry() throws IOException { .singleElement() .isEqualTo(otherDirtyApplicationResultEntry.getApplicationId()); } + + @Test + default void testGetCleanApplicationResultWithCleanEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNotNull(); + assertThat(result.getApplicationId()) + .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + assertThat(result.getApplicationState()) + .isEqualTo( + DUMMY_APPLICATION_RESULT_ENTRY + .getApplicationResult() + .getApplicationState()); + assertThat(result.getApplicationName()) + .isEqualTo( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationResult().getApplicationName()); + } + + @Test + default void testGetCleanApplicationResultWithNoEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } + + @Test + default void testGetCleanApplicationResultWithDirtyEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java index fb4206a089292..08b5bf139ab1d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java @@ -21,11 +21,14 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.util.concurrent.Executors; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import static org.assertj.core.api.Assertions.assertThat; + /** * Tests for the {@link FileSystemApplicationResultStore} implementation of the {@link * ApplicationResultStore}'s contracts. @@ -40,4 +43,24 @@ public ApplicationResultStore createApplicationResultStore() throws IOException return new FileSystemApplicationResultStore( path.getFileSystem(), path, false, Executors.directExecutor()); } + + @Test + void testGetCleanApplicationResultWhenDeleteOnCommitIsTrue() throws Exception { + Path path = new Path(temporaryFolder.toURI()); + FileSystemApplicationResultStore applicationResultStore = + new FileSystemApplicationResultStore( + path.getFileSystem(), path, true, Executors.directExecutor()); + + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + + final ApplicationResult result = + applicationResultStore + .getCleanApplicationResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat(result).isNull(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java index cf6dae44f0f3b..b0bfa5a440bde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java @@ -66,6 +66,8 @@ public static ApplicationResult createApplicationResult( hasCleanApplicationResultEntryFunction; private final SupplierWithException, ? extends IOException> getDirtyResultsSupplier; + private final Function> + getCleanApplicationResultFunction; private TestingApplicationResultStore( Function> createDirtyResultConsumer, @@ -76,13 +78,16 @@ private TestingApplicationResultStore( Function> hasCleanApplicationResultEntryFunction, SupplierWithException, ? extends IOException> - getDirtyResultsSupplier) { + getDirtyResultsSupplier, + Function> + getCleanApplicationResultFunction) { this.createDirtyResultConsumer = createDirtyResultConsumer; this.markResultAsCleanConsumer = markResultAsCleanConsumer; this.hasApplicationResultEntryFunction = hasApplicationResultEntryFunction; this.hasDirtyApplicationResultEntryFunction = hasDirtyApplicationResultEntryFunction; this.hasCleanApplicationResultEntryFunction = hasCleanApplicationResultEntryFunction; this.getDirtyResultsSupplier = getDirtyResultsSupplier; + this.getCleanApplicationResultFunction = getCleanApplicationResultFunction; } @Override @@ -118,6 +123,12 @@ public Set getDirtyResults() throws IOException { return getDirtyResultsSupplier.get(); } + @Override + public CompletableFuture getCleanApplicationResultAsync( + ApplicationID applicationId) { + return getCleanApplicationResultFunction.apply(applicationId); + } + public static TestingApplicationResultStore.Builder builder() { return new Builder(); } @@ -144,6 +155,12 @@ public static class Builder { private SupplierWithException, ? extends IOException> getDirtyResultsSupplier = Collections::emptySet; + private Function> + getCleanApplicationResultFunction = + applicationID -> + CompletableFuture.completedFuture( + createSuccessfulApplicationResult(applicationID)); + public Builder withCreateDirtyResultConsumer( Function> createDirtyResultConsumer) { @@ -185,6 +202,13 @@ public Builder withGetDirtyResultsSupplier( return this; } + public Builder withGetCleanApplicationResultFunction( + Function> + getCleanApplicationResultFunction) { + this.getCleanApplicationResultFunction = getCleanApplicationResultFunction; + return this; + } + public TestingApplicationResultStore build() { return new TestingApplicationResultStore( createDirtyResultConsumer, @@ -192,7 +216,8 @@ public TestingApplicationResultStore build() { hasApplicationResultEntryFunction, hasDirtyApplicationResultEntryFunction, hasCleanApplicationResultEntryFunction, - getDirtyResultsSupplier); + getDirtyResultsSupplier, + getCleanApplicationResultFunction); } } } From 8793872b0f991e0b8d55e1fa1f8e28eed654f2b7 Mon Sep 17 00:00:00 2001 From: Yi Zhang Date: Fri, 27 Mar 2026 11:41:35 +0800 Subject: [PATCH 2/2] [hotfix][runtime] Fix DispatcherCleanupITCase.testCleanupAfterLeadershipChange --- .../flink/runtime/dispatcher/DispatcherCleanupITCase.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java index da202b0dc443e..4eca35fc00f22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java @@ -318,6 +318,8 @@ public void testCleanupAfterLeadershipChange() throws Exception { waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId); firstCleanupTriggered.await(); + CommonTestUtils.waitUntilCondition( + () -> !haServices.getApplicationResultStore().getDirtyResults().isEmpty()); assertThat(actualGlobalCleanupCallCount.get()) .as("The cleanup should have been triggered only once.") @@ -335,12 +337,6 @@ public void testCleanupAfterLeadershipChange() throws Exception { .collect(Collectors.toSet())) .as("The JobResultStore should have this job marked as dirty.") .containsExactly(jobId); - assertThat( - haServices.getApplicationResultStore().getDirtyResults().stream() - .map(ApplicationResult::getApplicationId) - .collect(Collectors.toSet())) - .as("The ApplicationResultStore should have this application marked as dirty.") - .containsExactly(applicationId); // Run a second dispatcher, that restores our finished job. final Dispatcher secondDispatcher =