diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
index f50374712747a..94707eb6c7e64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.clusterframework;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.shaded.guava33.com.google.common.collect.BiMap;
@@ -89,4 +90,28 @@ public JobStatus deriveJobStatus() {
return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(this);
}
+
+ /**
+ * Derives the ApplicationStatus that corresponds to the given ApplicationState. If the
+ * ApplicationState is not a terminal state, this method returns {@link #UNKNOWN}.
+ *
+ *
Note: {@code ApplicationState} covers the entire lifecycle of an application, representing
+ * various stages from created to finish. {@code ApplicationStatus}, on the other hand,
+ * describes the final state of the cluster when shutdown.
+ *
+ * @param applicationState the application state
+ * @return the corresponding status
+ */
+ public static ApplicationStatus fromApplicationState(ApplicationState applicationState) {
+ switch (applicationState) {
+ case FAILED:
+ return FAILED;
+ case CANCELED:
+ return CANCELED;
+ case FINISHED:
+ return SUCCEEDED;
+ default:
+ return UNKNOWN;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d5394704e327e..1c6c8d5bc163b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -31,6 +31,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WebOptions;
@@ -483,52 +484,14 @@ public void onStart() throws Exception {
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
+ startApplicationsCleanup();
+
if (dispatcherBootstrap instanceof ApplicationBootstrap) {
// Application Mode
- checkState(suspendedApplications.isEmpty());
- checkState(recoveredDirtyApplicationResults.size() <= 1);
-
- AbstractApplication application =
- ((ApplicationBootstrap) dispatcherBootstrap).getApplication();
- if (!recoveredDirtyApplicationResults.isEmpty()) {
- // the application is already terminated
- ApplicationResult applicationResult =
- recoveredDirtyApplicationResults.iterator().next();
- checkState(
- application
- .getApplicationId()
- .equals(applicationResult.getApplicationId()));
-
- startApplicationCleanup();
- } else {
- // defer starting recovered jobs, as they might be skipped based on user logic
- internalSubmitApplication(application).get();
- }
+ maybeSubmitApplicationInApplicationMode();
} else {
// Session Mode
- startApplicationCleanup();
-
- // start suspended applications
- for (AbstractApplication suspendedApplication : suspendedApplications.values()) {
- // defer starting recovered jobs, as they might be skipped based on user logic
- internalSubmitApplication(suspendedApplication).get();
- }
-
- // start suspended jobs that do not belong to any application (previously submitted in a
- // SingleJobApplication) by wrapping them into a SingleJobApplication
- Iterator> jobIterator =
- suspendedJobs.entrySet().iterator();
- while (jobIterator.hasNext()) {
- Map.Entry entry = jobIterator.next();
- ExecutionPlan recoveredJob = entry.getValue();
- ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null);
-
- if (!suspendedApplications.containsKey(applicationId)) {
- runRecoveredJob(recoveredJob, true);
- jobIterator.remove();
- suspendedJobIdsByApplicationId.remove(applicationId);
- }
- }
+ recoverApplicationsAndJobsInSessionMode();
}
checkState(recoveredDirtyJobResultsByApplicationId.isEmpty());
@@ -673,49 +636,145 @@ private void runJobWithCleanupRunner(
}
}
- private void startApplicationCleanup() {
+ private void startApplicationsCleanup() {
for (ApplicationResult applicationResult : recoveredDirtyApplicationResults) {
- ApplicationID applicationId = applicationResult.getApplicationId();
- ApplicationState applicationState = applicationResult.getApplicationState();
-
- Map jobs = new HashMap<>();
- Collection dirtyJobResults =
- recoveredDirtyJobResultsByApplicationId.remove(applicationId);
- if (dirtyJobResults != null) {
- for (JobResult jobResult : dirtyJobResults) {
- JobID jobId = jobResult.getJobId();
- ExecutionGraphInfo executionGraphInfo =
- new ExecutionGraphInfo(
- ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
- jobResult, -1));
- jobs.put(jobId, executionGraphInfo);
-
- runJobWithCleanupRunner(jobResult, false);
- }
+ try {
+ startApplicationCleanup(applicationResult);
+ } catch (Throwable throwable) {
+ onFatalError(
+ new DispatcherException(
+ String.format(
+ "Could not start cleanup for application %s.",
+ applicationResult.getApplicationId()),
+ throwable));
}
+ }
+ }
- long[] stateTimestamps = new long[ApplicationState.values().length];
- stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime();
- stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime();
+ private void startApplicationCleanup(ApplicationResult applicationResult) {
+ ApplicationID applicationId = applicationResult.getApplicationId();
+ ApplicationState applicationState = applicationResult.getApplicationState();
- ArchivedApplication sparseArchivedApplication =
- new ArchivedApplication(
- applicationId,
- applicationResult.getApplicationName(),
- applicationState,
- stateTimestamps,
- jobs,
- Collections.emptyList());
+ Map jobs = new HashMap<>();
+ Collection dirtyJobResults =
+ recoveredDirtyJobResultsByApplicationId.remove(applicationId);
+ if (dirtyJobResults != null) {
+ for (JobResult jobResult : dirtyJobResults) {
+ JobID jobId = jobResult.getJobId();
+ ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+ jobResult, -1));
+ jobs.put(jobId, executionGraphInfo);
+
+ runJobWithCleanupRunner(jobResult, false);
+ }
+ }
- writeToArchivedApplicationStore(sparseArchivedApplication);
+ long[] stateTimestamps = new long[ApplicationState.values().length];
+ stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime();
+ stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime();
+
+ ArchivedApplication sparseArchivedApplication =
+ new ArchivedApplication(
+ applicationId,
+ applicationResult.getApplicationName(),
+ applicationState,
+ stateTimestamps,
+ jobs,
+ Collections.emptyList());
+
+ writeToArchivedApplicationStore(sparseArchivedApplication);
+
+ // the dirty result already exists
+ // create a completed future to make sure the jobs can be marked clean
+ applicationCreateDirtyResultFutures.put(applicationId, FutureUtils.completedVoidFuture());
+ applicationTerminationFutures.put(applicationId, new CompletableFuture<>());
- // the dirty result already exists
- // create a completed future to make sure the jobs can be marked clean
- applicationCreateDirtyResultFutures.put(
- applicationId, FutureUtils.completedVoidFuture());
- applicationTerminationFutures.put(applicationId, new CompletableFuture<>());
+ removeApplication(applicationId, jobs.keySet());
+ }
- removeApplication(applicationId, jobs.keySet());
+ private void recoverApplicationsAndJobsInSessionMode() {
+ for (AbstractApplication suspendedApplication : suspendedApplications.values()) {
+ // defer starting recovered jobs, as they might be skipped based on user logic
+ try {
+ internalSubmitApplication(suspendedApplication).get();
+ } catch (Throwable throwable) {
+ onFatalError(
+ new DispatcherException(
+ String.format(
+ "Could not start recovered application %s.",
+ suspendedApplication.getApplicationId()),
+ throwable));
+ }
+ }
+
+ // start suspended jobs that do not belong to any application (previously submitted in a
+ // SingleJobApplication) by wrapping them into a SingleJobApplication
+ Iterator> jobIterator = suspendedJobs.entrySet().iterator();
+ while (jobIterator.hasNext()) {
+ Map.Entry entry = jobIterator.next();
+ ExecutionPlan recoveredJob = entry.getValue();
+ ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null);
+ if (!suspendedApplications.containsKey(applicationId)) {
+ runRecoveredJob(recoveredJob, true);
+ jobIterator.remove();
+ suspendedJobIdsByApplicationId.remove(applicationId);
+ }
+ }
+ }
+
+ private void maybeSubmitApplicationInApplicationMode() {
+ checkState(suspendedApplications.isEmpty());
+ checkState(recoveredDirtyApplicationResults.size() <= 1);
+
+ AbstractApplication application =
+ ((ApplicationBootstrap) dispatcherBootstrap).getApplication();
+ ApplicationID applicationId = application.getApplicationId();
+ boolean shutDownOnApplicationFinish =
+ configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
+ if (!recoveredDirtyApplicationResults.isEmpty()) {
+ // the application is already terminated but needs to be cleaned up
+ ApplicationResult applicationResult =
+ recoveredDirtyApplicationResults.iterator().next();
+ checkState(applicationId.equals(applicationResult.getApplicationId()));
+
+ if (shutDownOnApplicationFinish) {
+ shutDownCluster(
+ ApplicationStatus.fromApplicationState(
+ applicationResult.getApplicationState()));
+ }
+ } else {
+ // check whether the application is already cleaned up
+ ApplicationResult applicationResult = null;
+ try {
+ applicationResult =
+ applicationResultStore.getCleanApplicationResultAsync(applicationId).get();
+ } catch (Throwable throwable) {
+ onFatalError(
+ new DispatcherException(
+ String.format(
+ "Could not get clean application result for application %s.",
+ applicationId),
+ throwable));
+ }
+
+ if (applicationResult == null) {
+ try {
+ internalSubmitApplication(application).get();
+ } catch (Throwable throwable) {
+ onFatalError(
+ new DispatcherException(
+ String.format("Could not start application %s.", applicationId),
+ throwable));
+ }
+ } else {
+ if (shutDownOnApplicationFinish) {
+ shutDownCluster(
+ ApplicationStatus.fromApplicationState(
+ applicationResult.getApplicationState()));
+ }
+ }
}
}
@@ -873,29 +932,45 @@ public CompletableFuture submitApplication(
final ApplicationID applicationId = application.getApplicationId();
log.info(
"Received application submission '{}' ({}).", application.getName(), applicationId);
+ return applicationResultStore
+ .hasApplicationResultEntryAsync(applicationId)
+ .thenComposeAsync(
+ isTerminated -> {
+ if (isTerminated) {
+ log.warn(
+ "Ignoring application submission '{}' ({}) because the application already "
+ + "reached a terminal state.",
+ application.getName(),
+ applicationId);
+ return FutureUtils.completedExceptionally(
+ new DuplicateApplicationSubmissionException(applicationId));
+ } else if (applications.containsKey(applicationId)
+ || archivedApplicationStore.get(applicationId).isPresent()) {
+ log.warn("Application with id {} already exists.", applicationId);
+ return FutureUtils.completedExceptionally(
+ new DuplicateApplicationSubmissionException(applicationId));
+ }
- if (applications.containsKey(applicationId)) {
- log.warn("Application with id {} already exists.", applicationId);
- throw new CompletionException(
- new DuplicateApplicationSubmissionException(applicationId));
- }
-
- Optional optionalApplicationStoreEntry =
- application.getApplicationStoreEntry();
- if (optionalApplicationStoreEntry.isPresent()) {
- try {
- applicationWriter.putApplication(optionalApplicationStoreEntry.get());
- } catch (Exception e) {
- String msg =
- String.format(
- "Could not persist application %s to the ApplicationStore.",
- applicationId);
- log.warn(msg);
- throw new CompletionException(new RuntimeException(msg, e));
- }
- }
+ Optional optionalApplicationStoreEntry =
+ application.getApplicationStoreEntry();
+ if (optionalApplicationStoreEntry.isPresent()) {
+ try {
+ applicationWriter.putApplication(
+ optionalApplicationStoreEntry.get());
+ } catch (Exception e) {
+ String msg =
+ String.format(
+ "Could not persist application %s to the ApplicationStore.",
+ applicationId);
+ log.warn(msg, e);
+ return FutureUtils.completedExceptionally(
+ new RuntimeException(msg, e));
+ }
+ }
- return internalSubmitApplication(application);
+ return internalSubmitApplication(application);
+ },
+ getMainThreadExecutor());
}
/** This method must be called from the main thread. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
index 5fbdacff80707..4f539dd7c1608 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
@@ -128,6 +128,16 @@ public Set getDirtyResults() throws IOException {
@GuardedBy("readWriteLock")
protected abstract Set getDirtyResultsInternal() throws IOException;
+ @Override
+ public CompletableFuture getCleanApplicationResultAsync(
+ ApplicationID applicationId) {
+ return withReadLockAsync(() -> getCleanApplicationResultInternal(applicationId));
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract ApplicationResult getCleanApplicationResultInternal(
+ ApplicationID applicationId) throws IOException;
+
private CompletableFuture withWriteLockAsync(ThrowingRunnable runnable) {
return FutureUtils.runAsync(
() -> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
index 5606e8e8169e9..8e48b84d0e2c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java
@@ -109,4 +109,21 @@ default CompletableFuture hasApplicationResultEntryAsync(ApplicationID
* @throws IOException if collecting the set of dirty results failed for IO reasons.
*/
Set getDirtyResults() throws IOException;
+
+ /**
+ * Asynchronously gets the persisted {@link ApplicationResult} instance that is marked as {@code
+ * clean} for the given {@code ApplicationID}.
+ *
+ * This method is used to determine whether the application has already completed and been
+ * cleaned up, thereby avoiding duplicate execution. Note that it only works when {@link
+ * ApplicationResultStoreOptions#DELETE_ON_COMMIT} is set to {@code false}; otherwise clean
+ * entries are deleted upon commit and cannot be retrieved.
+ *
+ * @param applicationId Ident of the application we wish to get.
+ * @return a {@link CompletableFuture} that completes with the {@link ApplicationResult} if a
+ * clean entry exists for the given {@code applicationId}; otherwise a successfully
+ * completed future with {@code null}.
+ */
+ CompletableFuture getCleanApplicationResultAsync(
+ ApplicationID applicationId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
index af987d6dba3aa..3ac50ebb3523e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java
@@ -75,6 +75,16 @@ protected Set getDirtyResultsInternal() {
.collect(Collectors.toSet());
}
+ @Override
+ protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) {
+ final ApplicationResultEntry entry = cleanResults.get(applicationId);
+ if (entry == null) {
+ return null;
+ }
+
+ return entry.getApplicationResult();
+ }
+
/** Clears all stored results. */
public void clear() {
dirtyResults.clear();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
index fd938fede9b15..b4832110fad5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java
@@ -230,6 +230,23 @@ public Set getDirtyResultsInternal() throws IOException {
return dirtyResults;
}
+ @Override
+ protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId)
+ throws IOException {
+ if (deleteOnCommit) {
+ return null;
+ }
+
+ Path cleanPath = constructCleanPath(applicationId);
+ if (!fileSystem.exists(cleanPath)) {
+ return null;
+ }
+
+ JsonApplicationResultEntry jre =
+ mapper.readValue(fileSystem.open(cleanPath), JsonApplicationResultEntry.class);
+ return jre.getApplicationResult();
+ }
+
/**
* Wrapper class around {@link ApplicationResultEntry} to allow for serialization of a schema
* version, so that future schema changes can be handled in a backwards compatible manner.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
index 507c0a4fe2675..7e4620b67a7bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.clusterframework;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobStatus;
import org.junit.jupiter.api.Test;
@@ -115,6 +116,34 @@ void testUnknownApplicationStatusForMissingJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
}
+ @Test
+ void testFailedApplicationStatusFromApplicationState() {
+ assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FAILED))
+ .isEqualTo(ApplicationStatus.FAILED);
+ }
+
+ @Test
+ void testCancelledApplicationStatusFromApplicationState() {
+ assertThat(ApplicationStatus.fromApplicationState(ApplicationState.CANCELED))
+ .isEqualTo(ApplicationStatus.CANCELED);
+ }
+
+ @Test
+ void testSucceededApplicationStatusFromApplicationState() {
+ assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FINISHED))
+ .isEqualTo(ApplicationStatus.SUCCEEDED);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ApplicationState.class,
+ names = {"CREATED", "RUNNING", "FAILING", "CANCELING"})
+ public void testUnknownApplicationStatusFromNonTerminalApplicationState(
+ ApplicationState applicationState) {
+ assertThat(ApplicationStatus.fromApplicationState(applicationState))
+ .isEqualTo(ApplicationStatus.UNKNOWN);
+ }
+
private static Iterable exitCodes(Iterable statuses) {
return StreamSupport.stream(statuses.spliterator(), false)
.map(ApplicationStatus::processExitCode)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
index 16c8d76d0720e..dfcd7065feb1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.application.AbstractApplication;
@@ -736,6 +737,122 @@ public void testDuplicateSubmissionWithRecoveredApplication() throws Exception {
.hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
}
+ @Test
+ public void testDuplicateSubmissionWithTerminatedButDirtyApplication() throws Exception {
+ final ApplicationResult applicationResult =
+ TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+ haServices
+ .getApplicationResultStore()
+ .createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
+ .get();
+
+ assertDuplicateApplicationSubmission();
+ }
+
+ @Test
+ public void testDuplicateSubmissionWithTerminatedAndCleanedApplication() throws Exception {
+ final ApplicationResult applicationResult =
+ TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+ haServices
+ .getApplicationResultStore()
+ .createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
+ .get();
+ haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();
+
+ assertDuplicateApplicationSubmission();
+ }
+
+ private void assertDuplicateApplicationSubmission() throws Exception {
+ dispatcher = createTestingDispatcherBuilder().build(rpcService);
+ dispatcher.start();
+
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final AbstractApplication application =
+ TestingApplication.builder().setApplicationId(applicationId).build();
+
+ final CompletableFuture submitFuture =
+ dispatcherGateway.submitApplication(application, TIMEOUT);
+
+ assertThatThrownBy(submitFuture::get)
+ .hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
+ }
+
+ @Test
+ public void testApplicationBootstrapWithDirtyResultTriggersShutdown() throws Exception {
+ testApplicationBootstrapWithApplicationResult(false, true);
+ }
+
+ @Test
+ public void testApplicationBootstrapWithDirtyResultDoesNotTriggerShutdownWhenDisabled()
+ throws Exception {
+ testApplicationBootstrapWithApplicationResult(false, false);
+ }
+
+ @Test
+ public void testApplicationBootstrapWithCleanResultTriggersShutdown() throws Exception {
+ testApplicationBootstrapWithApplicationResult(true, true);
+ }
+
+ @Test
+ public void testApplicationBootstrapWithCleanResultDoesNotTriggerShutdownWhenDisabled()
+ throws Exception {
+ testApplicationBootstrapWithApplicationResult(true, false);
+ }
+
+ private void testApplicationBootstrapWithApplicationResult(
+ boolean isCleanResult, boolean triggerShutDown) throws Exception {
+ configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, triggerShutDown);
+
+ final ApplicationResult applicationResult =
+ TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
+ haServices
+ .getApplicationResultStore()
+ .createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
+ .get();
+ if (isCleanResult) {
+ haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();
+ }
+
+ final OneShotLatch bootstrapLatch = new OneShotLatch();
+ final TestingDispatcher.Builder builder =
+ createTestingDispatcherBuilder()
+ .setDispatcherBootstrapFactory(
+ (ignoredDispatcherGateway,
+ ignoredScheduledExecutor,
+ ignoredFatalErrorHandler) ->
+ new ApplicationBootstrap(
+ TestingApplication.builder()
+ .setApplicationId(applicationId)
+ .setExecuteFunction(
+ ignoredExecuteParams -> {
+ bootstrapLatch.trigger();
+ return CompletableFuture
+ .completedFuture(
+ Acknowledge
+ .get());
+ })
+ .build()));
+ if (!isCleanResult) {
+ builder.setRecoveredDirtyApplications(Collections.singleton(applicationResult));
+ }
+ dispatcher = builder.build(rpcService);
+ dispatcher.start();
+
+ if (triggerShutDown) {
+ assertEquals(
+ ApplicationStatus.SUCCEEDED,
+ dispatcher.getShutDownFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
+ } else {
+ assertThatThrownBy(
+ () -> dispatcher.getShutDownFuture().get(100L, TimeUnit.MILLISECONDS))
+ .isInstanceOf(TimeoutException.class);
+ }
+
+ assertFalse(bootstrapLatch.isTriggered());
+ }
+
@Test
public void testThatDirtilyFinishedApplicationsNotRetriggered() {
final AbstractApplication application =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index da202b0dc443e..4eca35fc00f22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -318,6 +318,8 @@ public void testCleanupAfterLeadershipChange() throws Exception {
waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
firstCleanupTriggered.await();
+ CommonTestUtils.waitUntilCondition(
+ () -> !haServices.getApplicationResultStore().getDirtyResults().isEmpty());
assertThat(actualGlobalCleanupCallCount.get())
.as("The cleanup should have been triggered only once.")
@@ -335,12 +337,6 @@ public void testCleanupAfterLeadershipChange() throws Exception {
.collect(Collectors.toSet()))
.as("The JobResultStore should have this job marked as dirty.")
.containsExactly(jobId);
- assertThat(
- haServices.getApplicationResultStore().getDirtyResults().stream()
- .map(ApplicationResult::getApplicationId)
- .collect(Collectors.toSet()))
- .as("The ApplicationResultStore should have this application marked as dirty.")
- .containsExactly(applicationId);
// Run a second dispatcher, that restores our finished job.
final Dispatcher secondDispatcher =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
index 803f69c792073..51aa19be5c323 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java
@@ -208,4 +208,54 @@ default void testGetDirtyResultsWithDirtyAndCleanEntry() throws IOException {
.singleElement()
.isEqualTo(otherDirtyApplicationResultEntry.getApplicationId());
}
+
+ @Test
+ default void testGetCleanApplicationResultWithCleanEntry() throws IOException {
+ ApplicationResultStore applicationResultStore = createApplicationResultStore();
+ applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+ applicationResultStore
+ .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+
+ final ApplicationResult result =
+ applicationResultStore
+ .getCleanApplicationResultAsync(
+ DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+ assertThat(result).isNotNull();
+ assertThat(result.getApplicationId())
+ .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId());
+ assertThat(result.getApplicationState())
+ .isEqualTo(
+ DUMMY_APPLICATION_RESULT_ENTRY
+ .getApplicationResult()
+ .getApplicationState());
+ assertThat(result.getApplicationName())
+ .isEqualTo(
+ DUMMY_APPLICATION_RESULT_ENTRY.getApplicationResult().getApplicationName());
+ }
+
+ @Test
+ default void testGetCleanApplicationResultWithNoEntry() throws IOException {
+ ApplicationResultStore applicationResultStore = createApplicationResultStore();
+ final ApplicationResult result =
+ applicationResultStore
+ .getCleanApplicationResultAsync(
+ DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+ assertThat(result).isNull();
+ }
+
+ @Test
+ default void testGetCleanApplicationResultWithDirtyEntry() throws IOException {
+ ApplicationResultStore applicationResultStore = createApplicationResultStore();
+ applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+
+ final ApplicationResult result =
+ applicationResultStore
+ .getCleanApplicationResultAsync(
+ DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+ assertThat(result).isNull();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
index fb4206a089292..08b5bf139ab1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java
@@ -21,11 +21,14 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.concurrent.Executors;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Tests for the {@link FileSystemApplicationResultStore} implementation of the {@link
* ApplicationResultStore}'s contracts.
@@ -40,4 +43,24 @@ public ApplicationResultStore createApplicationResultStore() throws IOException
return new FileSystemApplicationResultStore(
path.getFileSystem(), path, false, Executors.directExecutor());
}
+
+ @Test
+ void testGetCleanApplicationResultWhenDeleteOnCommitIsTrue() throws Exception {
+ Path path = new Path(temporaryFolder.toURI());
+ FileSystemApplicationResultStore applicationResultStore =
+ new FileSystemApplicationResultStore(
+ path.getFileSystem(), path, true, Executors.directExecutor());
+
+ applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join();
+ applicationResultStore
+ .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+
+ final ApplicationResult result =
+ applicationResultStore
+ .getCleanApplicationResultAsync(
+ DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())
+ .join();
+ assertThat(result).isNull();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
index cf6dae44f0f3b..b0bfa5a440bde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java
@@ -66,6 +66,8 @@ public static ApplicationResult createApplicationResult(
hasCleanApplicationResultEntryFunction;
private final SupplierWithException, ? extends IOException>
getDirtyResultsSupplier;
+ private final Function>
+ getCleanApplicationResultFunction;
private TestingApplicationResultStore(
Function> createDirtyResultConsumer,
@@ -76,13 +78,16 @@ private TestingApplicationResultStore(
Function>
hasCleanApplicationResultEntryFunction,
SupplierWithException, ? extends IOException>
- getDirtyResultsSupplier) {
+ getDirtyResultsSupplier,
+ Function>
+ getCleanApplicationResultFunction) {
this.createDirtyResultConsumer = createDirtyResultConsumer;
this.markResultAsCleanConsumer = markResultAsCleanConsumer;
this.hasApplicationResultEntryFunction = hasApplicationResultEntryFunction;
this.hasDirtyApplicationResultEntryFunction = hasDirtyApplicationResultEntryFunction;
this.hasCleanApplicationResultEntryFunction = hasCleanApplicationResultEntryFunction;
this.getDirtyResultsSupplier = getDirtyResultsSupplier;
+ this.getCleanApplicationResultFunction = getCleanApplicationResultFunction;
}
@Override
@@ -118,6 +123,12 @@ public Set getDirtyResults() throws IOException {
return getDirtyResultsSupplier.get();
}
+ @Override
+ public CompletableFuture getCleanApplicationResultAsync(
+ ApplicationID applicationId) {
+ return getCleanApplicationResultFunction.apply(applicationId);
+ }
+
public static TestingApplicationResultStore.Builder builder() {
return new Builder();
}
@@ -144,6 +155,12 @@ public static class Builder {
private SupplierWithException, ? extends IOException>
getDirtyResultsSupplier = Collections::emptySet;
+ private Function>
+ getCleanApplicationResultFunction =
+ applicationID ->
+ CompletableFuture.completedFuture(
+ createSuccessfulApplicationResult(applicationID));
+
public Builder withCreateDirtyResultConsumer(
Function>
createDirtyResultConsumer) {
@@ -185,6 +202,13 @@ public Builder withGetDirtyResultsSupplier(
return this;
}
+ public Builder withGetCleanApplicationResultFunction(
+ Function>
+ getCleanApplicationResultFunction) {
+ this.getCleanApplicationResultFunction = getCleanApplicationResultFunction;
+ return this;
+ }
+
public TestingApplicationResultStore build() {
return new TestingApplicationResultStore(
createDirtyResultConsumer,
@@ -192,7 +216,8 @@ public TestingApplicationResultStore build() {
hasApplicationResultEntryFunction,
hasDirtyApplicationResultEntryFunction,
hasCleanApplicationResultEntryFunction,
- getDirtyResultsSupplier);
+ getDirtyResultsSupplier,
+ getCleanApplicationResultFunction);
}
}
}