Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobStatus;

import org.apache.flink.shaded.guava33.com.google.common.collect.BiMap;
Expand Down Expand Up @@ -89,4 +90,28 @@ public JobStatus deriveJobStatus() {

return JOB_STATUS_APPLICATION_STATUS_BI_MAP.inverse().get(this);
}

/**
* Derives the ApplicationStatus that corresponds to the given ApplicationState. If the
* ApplicationState is not a terminal state, this method returns {@link #UNKNOWN}.
*
* <p>Note: {@code ApplicationState} covers the entire lifecycle of an application, representing
* various stages from created to finish. {@code ApplicationStatus}, on the other hand,
* describes the final state of the cluster when shutdown.
*
* @param applicationState the application state
* @return the corresponding status
*/
public static ApplicationStatus fromApplicationState(ApplicationState applicationState) {
switch (applicationState) {
case FAILED:
return FAILED;
case CANCELED:
return CANCELED;
case FINISHED:
return SUCCEEDED;
default:
return UNKNOWN;
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ public Set<ApplicationResult> getDirtyResults() throws IOException {
@GuardedBy("readWriteLock")
protected abstract Set<ApplicationResult> getDirtyResultsInternal() throws IOException;

@Override
public CompletableFuture<ApplicationResult> getCleanApplicationResultAsync(
ApplicationID applicationId) {
return withReadLockAsync(() -> getCleanApplicationResultInternal(applicationId));
}

@GuardedBy("readWriteLock")
protected abstract ApplicationResult getCleanApplicationResultInternal(
ApplicationID applicationId) throws IOException;

private CompletableFuture<Void> withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
return FutureUtils.runAsync(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,21 @@ default CompletableFuture<Boolean> hasApplicationResultEntryAsync(ApplicationID
* @throws IOException if collecting the set of dirty results failed for IO reasons.
*/
Set<ApplicationResult> getDirtyResults() throws IOException;

/**
* Asynchronously gets the persisted {@link ApplicationResult} instance that is marked as {@code
* clean} for the given {@code ApplicationID}.
*
* <p>This method is used to determine whether the application has already completed and been
* cleaned up, thereby avoiding duplicate execution. Note that it only works when {@link
* ApplicationResultStoreOptions#DELETE_ON_COMMIT} is set to {@code false}; otherwise clean
* entries are deleted upon commit and cannot be retrieved.
*
* @param applicationId Ident of the application we wish to get.
* @return a {@link CompletableFuture} that completes with the {@link ApplicationResult} if a
* clean entry exists for the given {@code applicationId}; otherwise a successfully
* completed future with {@code null}.
*/
CompletableFuture<ApplicationResult> getCleanApplicationResultAsync(
ApplicationID applicationId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ protected Set<ApplicationResult> getDirtyResultsInternal() {
.collect(Collectors.toSet());
}

@Override
protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId) {
final ApplicationResultEntry entry = cleanResults.get(applicationId);
if (entry == null) {
return null;
}

return entry.getApplicationResult();
}

/** Clears all stored results. */
public void clear() {
dirtyResults.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,23 @@ public Set<ApplicationResult> getDirtyResultsInternal() throws IOException {
return dirtyResults;
}

@Override
protected ApplicationResult getCleanApplicationResultInternal(ApplicationID applicationId)
throws IOException {
if (deleteOnCommit) {
return null;
}

Path cleanPath = constructCleanPath(applicationId);
if (!fileSystem.exists(cleanPath)) {
return null;
}

JsonApplicationResultEntry jre =
mapper.readValue(fileSystem.open(cleanPath), JsonApplicationResultEntry.class);
return jre.getApplicationResult();
}

/**
* Wrapper class around {@link ApplicationResultEntry} to allow for serialization of a schema
* version, so that future schema changes can be handled in a backwards compatible manner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobStatus;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -115,6 +116,34 @@ void testUnknownApplicationStatusForMissingJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
}

@Test
void testFailedApplicationStatusFromApplicationState() {
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FAILED))
.isEqualTo(ApplicationStatus.FAILED);
}

@Test
void testCancelledApplicationStatusFromApplicationState() {
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.CANCELED))
.isEqualTo(ApplicationStatus.CANCELED);
}

@Test
void testSucceededApplicationStatusFromApplicationState() {
assertThat(ApplicationStatus.fromApplicationState(ApplicationState.FINISHED))
.isEqualTo(ApplicationStatus.SUCCEEDED);
}

@ParameterizedTest
@EnumSource(
value = ApplicationState.class,
names = {"CREATED", "RUNNING", "FAILING", "CANCELING"})
public void testUnknownApplicationStatusFromNonTerminalApplicationState(
ApplicationState applicationState) {
assertThat(ApplicationStatus.fromApplicationState(applicationState))
.isEqualTo(ApplicationStatus.UNKNOWN);
}

private static Iterable<Integer> exitCodes(Iterable<ApplicationStatus> statuses) {
return StreamSupport.stream(statuses.spliterator(), false)
.map(ApplicationStatus::processExitCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.application.AbstractApplication;
Expand Down Expand Up @@ -50,6 +51,8 @@
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkApplicationTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
Expand All @@ -58,6 +61,7 @@
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -736,6 +740,183 @@ public void testDuplicateSubmissionWithRecoveredApplication() throws Exception {
.hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
}

@Test
public void testDuplicateSubmissionWithTerminatedButDirtyApplication() throws Exception {
final ApplicationResult applicationResult =
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
haServices
.getApplicationResultStore()
.createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
.get();

assertDuplicateApplicationSubmission();
}

@Test
public void testDuplicateSubmissionWithTerminatedAndCleanedApplication() throws Exception {
final ApplicationResult applicationResult =
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
haServices
.getApplicationResultStore()
.createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
.get();
haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();

assertDuplicateApplicationSubmission();
}

private void assertDuplicateApplicationSubmission() throws Exception {
dispatcher = createTestingDispatcherBuilder().build(rpcService);
dispatcher.start();

final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);

final AbstractApplication application =
TestingApplication.builder().setApplicationId(applicationId).build();

final CompletableFuture<Acknowledge> submitFuture =
dispatcherGateway.submitApplication(application, TIMEOUT);

assertThatThrownBy(submitFuture::get)
.hasCauseInstanceOf(DuplicateApplicationSubmissionException.class);
}

@Test
public void testApplicationBootstrapWithDirtyResultTriggersShutdown() throws Exception {
testApplicationBootstrapWithApplicationResult(false, true);
}

@Test
public void testApplicationBootstrapWithDirtyResultDoesNotTriggerShutdownWhenDisabled()
throws Exception {
testApplicationBootstrapWithApplicationResult(false, false);
}

@Test
public void testApplicationBootstrapWithCleanResultTriggersShutdown() throws Exception {
testApplicationBootstrapWithApplicationResult(true, true);
}

@Test
public void testApplicationBootstrapWithCleanResultDoesNotTriggerShutdownWhenDisabled()
throws Exception {
testApplicationBootstrapWithApplicationResult(true, false);
}

private void testApplicationBootstrapWithApplicationResult(
boolean isCleanResult, boolean triggerShutDown) throws Exception {
configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, triggerShutDown);

final ApplicationResult applicationResult =
TestingApplicationResultStore.createSuccessfulApplicationResult(applicationId);
haServices
.getApplicationResultStore()
.createDirtyResultAsync(new ApplicationResultEntry(applicationResult))
.get();
if (isCleanResult) {
haServices.getApplicationResultStore().markResultAsCleanAsync(applicationId).get();
}

final OneShotLatch bootstrapLatch = new OneShotLatch();
final TestingDispatcher.Builder builder =
createTestingDispatcherBuilder()
.setDispatcherBootstrapFactory(
(ignoredDispatcherGateway,
ignoredScheduledExecutor,
ignoredFatalErrorHandler) ->
new ApplicationBootstrap(
TestingApplication.builder()
.setApplicationId(applicationId)
.setExecuteFunction(
ignoredExecuteParams -> {
bootstrapLatch.trigger();
return CompletableFuture
.completedFuture(
Acknowledge
.get());
})
.build()));
if (!isCleanResult) {
builder.setRecoveredDirtyApplications(Collections.singleton(applicationResult));
}
dispatcher = builder.build(rpcService);
dispatcher.start();

if (triggerShutDown) {
assertEquals(
ApplicationStatus.SUCCEEDED,
dispatcher.getShutDownFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
} else {
assertThatThrownBy(
() -> dispatcher.getShutDownFuture().get(100L, TimeUnit.MILLISECONDS))
.isInstanceOf(TimeoutException.class);
}

assertFalse(bootstrapLatch.isTriggered());
}

@Test
public void testRequestMultipleApplicationDetailsDeduplication() throws Exception {
final OneShotLatch markAsDirtyLatch = new OneShotLatch();
dispatcher =
createTestingDispatcherBuilder()
.setApplicationResultStore(
TestingApplicationResultStore.builder()
.withCreateDirtyResultConsumer(
ignoredApplicationResultEntry -> {
try {
markAsDirtyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return FutureUtils.completedExceptionally(
e);
}
return FutureUtils.completedVoidFuture();
})
.build())
.build(rpcService);
dispatcher.start();
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);

final AbstractApplication application =
TestingApplication.builder().setApplicationId(applicationId).build();

dispatcherGateway.submitApplication(application, TIMEOUT).get();
mockApplicationStatusChange(ApplicationState.FINISHED);

// The application should not be terminated yet because dirty result creation is blocked
assertThatThrownBy(
() ->
dispatcher
.getApplicationTerminationFuture(applicationId)
.get(100L, TimeUnit.MILLISECONDS))
.isInstanceOf(TimeoutException.class);

assertThat(dispatcher.getApplications().keySet()).containsExactly(applicationId);
assertThat(dispatcher.getArchivedApplicationStore().get(applicationId)).isPresent();

// The completed application should appear only once in the result
final MultipleApplicationsDetails multipleApplicationsDetails =
dispatcher.requestMultipleApplicationDetails(TIMEOUT).get();
assertThat(multipleApplicationsDetails.getApplications()).hasSize(1);

final ApplicationDetails applicationDetails =
multipleApplicationsDetails.getApplications().iterator().next();
assertThat(applicationDetails.getApplicationId()).isEqualTo(applicationId);

// Allow the application to be terminated
markAsDirtyLatch.trigger();
dispatcher
.getApplicationTerminationFuture(applicationId)
.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);

assertThat(dispatcher.getApplications()).isEmpty();
assertThat(dispatcher.requestMultipleApplicationDetails(TIMEOUT).get().getApplications())
.hasSize(1);
}

@Test
public void testThatDirtilyFinishedApplicationsNotRetriggered() {
final AbstractApplication application =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ public void testCleanupAfterLeadershipChange() throws Exception {

waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
firstCleanupTriggered.await();
CommonTestUtils.waitUntilCondition(
() -> !haServices.getApplicationResultStore().getDirtyResults().isEmpty());

assertThat(actualGlobalCleanupCallCount.get())
.as("The cleanup should have been triggered only once.")
Expand All @@ -335,12 +337,6 @@ public void testCleanupAfterLeadershipChange() throws Exception {
.collect(Collectors.toSet()))
.as("The JobResultStore should have this job marked as dirty.")
.containsExactly(jobId);
assertThat(
haServices.getApplicationResultStore().getDirtyResults().stream()
.map(ApplicationResult::getApplicationId)
.collect(Collectors.toSet()))
.as("The ApplicationResultStore should have this application marked as dirty.")
.containsExactly(applicationId);

// Run a second dispatcher, that restores our finished job.
final Dispatcher secondDispatcher =
Expand Down
Loading