diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index cbfc20b79..102e441f1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -3,7 +3,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.Timestamps; -import com.uber.m3.tally.Scope; import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.Payloads; import io.temporal.api.history.v1.HistoryEvent; @@ -14,14 +13,12 @@ import io.temporal.api.update.v1.Input; import io.temporal.api.update.v1.Request; import io.temporal.failure.CanceledFailure; -import io.temporal.internal.common.FailureUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.UpdateMessage; import io.temporal.internal.statemachines.WorkflowStateMachines; import io.temporal.internal.sync.SignalHandlerInfo; import io.temporal.internal.sync.UpdateHandlerInfo; import io.temporal.internal.worker.WorkflowExecutionException; -import io.temporal.worker.MetricsType; import io.temporal.worker.NonDeterministicException; import io.temporal.workflow.HandlerUnfinishedPolicy; import java.util.List; @@ -63,8 +60,6 @@ final class ReplayWorkflowExecutor { private final ReplayWorkflowContextImpl context; - private final Scope metricsScope; - public ReplayWorkflowExecutor( ReplayWorkflow workflow, WorkflowStateMachines workflowStateMachines, @@ -72,7 +67,6 @@ public ReplayWorkflowExecutor( this.workflow = workflow; this.workflowStateMachines = workflowStateMachines; this.context = context; - this.metricsScope = context.getMetricsScope(); } public void eventLoop() { @@ -131,12 +125,8 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) { if (context.isCancelRequested()) { workflowStateMachines.cancelWorkflow(); - metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1); } else if (failure != null) { workflowStateMachines.failWorkflow(failure.getFailure()); - if (!FailureUtils.isBenignApplicationFailure(failure.getFailure())) { - metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); - } } else { ContinueAsNewWorkflowExecutionCommandAttributes attributes = context.getContinueAsNewOnCompletion(); @@ -152,15 +142,9 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) { // This way attributes will need to be carried over in the mutable state and the flow // generally will be aligned with the flow of other commands. workflowStateMachines.continueAsNewWorkflow(attributes); - - // TODO Issue #1590 - metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1); } else { Optional workflowOutput = workflow.getOutput(); workflowStateMachines.completeWorkflow(workflowOutput); - - // TODO Issue #1590 - metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1); } } @@ -168,7 +152,7 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) { ProtobufTimeUtils.toM3Duration( Timestamps.fromMillis(System.currentTimeMillis()), Timestamps.fromMillis(context.getRunStartedTimestampMillis())); - metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d); + workflowStateMachines.setPostCompletionEndToEndLatency(d); } public void handleWorkflowExecutionCancelRequested(HistoryEvent event) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index d081f0f38..1d0d09a69 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -185,6 +185,21 @@ public WorkflowTaskResult handleWorkflowTask( if (workflow.getWorkflowContext() != null) { result.setVersioningBehavior(workflow.getWorkflowContext().getVersioningBehavior()); } + // Setup post-completion metrics to be applied after task response accepted + String postCompleteCounter = workflowStateMachines.getPostCompletionMetricCounter(); + com.uber.m3.util.Duration postCompleteLatency = + workflowStateMachines.getPostCompletionEndToEndLatency(); + if (postCompleteCounter != null || postCompleteLatency != null) { + result.setApplyPostCompletionMetrics( + () -> { + if (postCompleteCounter != null) { + metricsScope.counter(postCompleteCounter).inc(1); + } + if (postCompleteLatency != null) { + metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(postCompleteLatency); + } + }); + } return result.build(); } finally { lock.unlock(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java index 47e753b67..f5b7cb0d2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java @@ -252,7 +252,8 @@ private Result createCompletedWFTRequest( null, null, result.isFinalCommand(), - eventIdSetHandle); + eventIdSetHandle, + result.getApplyPostCompletionMetrics()); } private Result failureToWFTResult( @@ -275,7 +276,8 @@ private Result failureToWFTResult( .setFailure(((WorkflowExecutionException) e).getFailure())) .build()) .build(); - return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false, null); + return new WorkflowTaskHandler.Result( + workflowType, response, null, null, null, false, null, null); } WorkflowExecution execution = workflowTask.getWorkflowExecution(); @@ -316,7 +318,7 @@ private Result failureToWFTResult( WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE); } return new WorkflowTaskHandler.Result( - workflowType, null, failedRequest.build(), null, null, false, null); + workflowType, null, failedRequest.build(), null, null, false, null, null); } private Result createDirectQueryResult( @@ -346,6 +348,7 @@ private Result createDirectQueryResult( queryCompletedRequest.build(), null, false, + null, null); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java index 3eabd9bc7..9433b99f9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java @@ -25,6 +25,7 @@ public static final class Builder { private String writeSdkName; private String writeSdkVersion; private VersioningBehavior versioningBehavior; + private Runnable applyPostCompletionMetrics; public Builder setCommands(List commands) { this.commands = commands; @@ -76,6 +77,11 @@ public Builder setVersioningBehavior(VersioningBehavior versioningBehavior) { return this; } + public Builder setApplyPostCompletionMetrics(Runnable applyPostCompletionMetrics) { + this.applyPostCompletionMetrics = applyPostCompletionMetrics; + return this; + } + public WorkflowTaskResult build() { return new WorkflowTaskResult( commands == null ? Collections.emptyList() : commands, @@ -87,7 +93,8 @@ public WorkflowTaskResult build() { sdkFlags == null ? Collections.emptyList() : sdkFlags, writeSdkName, writeSdkVersion, - versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior); + versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior, + applyPostCompletionMetrics); } } @@ -101,6 +108,7 @@ public WorkflowTaskResult build() { private final String writeSdkName; private final String writeSdkVersion; private final VersioningBehavior versioningBehavior; + private final Runnable applyPostCompletionMetrics; private WorkflowTaskResult( List commands, @@ -112,7 +120,8 @@ private WorkflowTaskResult( List sdkFlags, String writeSdkName, String writeSdkVersion, - VersioningBehavior versioningBehavior) { + VersioningBehavior versioningBehavior, + Runnable applyPostCompletionMetrics) { this.commands = commands; this.messages = messages; this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts; @@ -126,6 +135,7 @@ private WorkflowTaskResult( this.writeSdkName = writeSdkName; this.writeSdkVersion = writeSdkVersion; this.versioningBehavior = versioningBehavior; + this.applyPostCompletionMetrics = applyPostCompletionMetrics; } public List getCommands() { @@ -168,4 +178,8 @@ public String getWriteSdkVersion() { public VersioningBehavior getVersioningBehavior() { return versioningBehavior; } + + public Runnable getApplyPostCompletionMetrics() { + return applyPostCompletionMetrics; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index eda2b4767..2f2c716f2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -10,6 +10,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.protobuf.Any; +import com.uber.m3.util.Duration; import io.temporal.api.command.v1.*; import io.temporal.api.common.v1.*; import io.temporal.api.enums.v1.EventType; @@ -26,6 +27,7 @@ import io.temporal.internal.sync.WorkflowThread; import io.temporal.internal.worker.LocalActivityResult; import io.temporal.serviceclient.Version; +import io.temporal.worker.MetricsType; import io.temporal.worker.NonDeterministicException; import io.temporal.worker.WorkflowImplementationOptions; import io.temporal.workflow.ChildWorkflowCancellationType; @@ -187,6 +189,9 @@ enum HandleEventStatus { */ private boolean shouldSkipUpsertVersionSA = false; + private String postCompletionMetricCounter; + private com.uber.m3.util.Duration postCompletionEndToEndLatency; + public WorkflowStateMachines( StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities, @@ -873,6 +878,18 @@ public long getLastStartedEventId() { return lastWFTStartedEventId; } + public String getPostCompletionMetricCounter() { + return postCompletionMetricCounter; + } + + public Duration getPostCompletionEndToEndLatency() { + return postCompletionEndToEndLatency; + } + + public void setPostCompletionEndToEndLatency(Duration postCompletionEndToEndLatency) { + this.postCompletionEndToEndLatency = postCompletionEndToEndLatency; + } + /** * @param attributes attributes used to schedule an activity * @param callback completion callback @@ -1112,11 +1129,15 @@ public void upsertMemo(Memo memo) { public void completeWorkflow(Optional workflowOutput) { checkEventLoopExecuting(); CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink); + postCompletionMetricCounter = MetricsType.WORKFLOW_COMPLETED_COUNTER; } public void failWorkflow(Failure failure) { checkEventLoopExecuting(); FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink); + if (!FailureUtils.isBenignApplicationFailure(failure)) { + postCompletionMetricCounter = MetricsType.WORKFLOW_FAILED_COUNTER; + } } public void cancelWorkflow() { @@ -1125,11 +1146,13 @@ public void cancelWorkflow() { CancelWorkflowExecutionCommandAttributes.getDefaultInstance(), commandSink, stateMachineSink); + postCompletionMetricCounter = MetricsType.WORKFLOW_CANCELED_COUNTER; } public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) { checkEventLoopExecuting(); ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink); + postCompletionMetricCounter = MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER; } public boolean isReplaying() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java index 6ebfe841b..129847fff 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java @@ -22,6 +22,7 @@ final class Result { private final RpcRetryOptions requestRetryOptions; private final boolean completionCommand; private final Functions.Proc1 resetEventIdHandle; + private final Runnable applyPostCompletionMetrics; public Result( String workflowType, @@ -30,7 +31,8 @@ public Result( RespondQueryTaskCompletedRequest queryCompleted, RpcRetryOptions requestRetryOptions, boolean completionCommand, - Functions.Proc1 resetEventIdHandle) { + Functions.Proc1 resetEventIdHandle, + Runnable applyPostCompletionMetrics) { this.workflowType = workflowType; this.taskCompleted = taskCompleted; this.taskFailed = taskFailed; @@ -38,6 +40,7 @@ public Result( this.requestRetryOptions = requestRetryOptions; this.completionCommand = completionCommand; this.resetEventIdHandle = resetEventIdHandle; + this.applyPostCompletionMetrics = applyPostCompletionMetrics; } public RespondWorkflowTaskCompletedRequest getTaskCompleted() { @@ -67,6 +70,10 @@ public Functions.Proc1 getResetEventIdHandle() { return (arg) -> {}; } + public Runnable getApplyPostCompletionMetrics() { + return applyPostCompletionMetrics; + } + @Override public String toString() { return "Result{" diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 1b64e6915..2193acf59 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -467,6 +467,11 @@ public void handle(WorkflowTask task) throws Exception { result.getRequestRetryOptions(), workflowTypeScope); } + + // Apply post-completion metrics only if runnable present and the above succeeded + if (result.getApplyPostCompletionMetrics() != null) { + result.getApplyPostCompletionMetrics().run(); + } } catch (GrpcMessageTooLargeException e) { // Only fail workflow task on the first attempt, subsequent failures of the same // workflow task should timeout. diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java index 6d29ffa11..98fddf17c 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/MetricsTest.java @@ -1,27 +1,33 @@ package io.temporal.client.functional; +import static io.temporal.testUtils.Eventually.assertEventually; import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; -import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.*; import com.uber.m3.tally.RootScopeBuilder; -import com.uber.m3.tally.Scope; -import com.uber.m3.tally.StatsReporter; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.ImmutableTag; -import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.temporal.client.WorkflowClient; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowTaskFailedCause; +import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; import io.temporal.common.reporter.MicrometerClientStatsReporter; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.CanceledFailure; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.MetricsType; -import io.temporal.serviceclient.WorkflowServiceStubs; -import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; +import java.time.Duration; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; @@ -32,16 +38,20 @@ public class MetricsTest { private static final long REPORTING_FLUSH_TIME = 50; + private final SimpleMeterRegistry registry = new SimpleMeterRegistry(); + private final RunCallbackActivityImpl runCallbackActivity = new RunCallbackActivityImpl(); + @Rule - public SDKTestWorkflowRule testWorkflowRule = + public final SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(QuicklyCompletingWorkflowImpl.class) + .setWorkflowTypes(QuicklyCompletingWorkflowImpl.class, MultiScenarioWorkflowImpl.class) + .setActivityImplementations(runCallbackActivity) + .setMetricsScope( + new RootScopeBuilder() + .reporter(new MicrometerClientStatsReporter(registry)) + .reportEvery(com.uber.m3.util.Duration.ofMillis(REPORTING_FLUSH_TIME >> 1))) .build(); - private SimpleMeterRegistry registry; - private WorkflowServiceStubs clientStubs; - private WorkflowClient workflowClient; - private static final List TAGS_NAMESPACE = MetricsTag.defaultTags(NAMESPACE).entrySet().stream() .map( @@ -49,91 +59,222 @@ public class MetricsTest { new ImmutableTag(nameValueEntry.getKey(), nameValueEntry.getValue())) .collect(Collectors.toList()); - private static List TAGS_NAMESPACE_QUEUE; + private List tagsNamespaceQueue; @Before public void setUp() { - this.registry = new SimpleMeterRegistry(); - StatsReporter reporter = new MicrometerClientStatsReporter(registry); - Scope metricsScope = - new RootScopeBuilder() - .reporter(reporter) - .reportEvery(com.uber.m3.util.Duration.ofMillis(REPORTING_FLUSH_TIME >> 1)); - - testWorkflowRule.getWorkflowClient().getWorkflowServiceStubs(); - WorkflowServiceStubsOptions options = - testWorkflowRule.getWorkflowClient().getWorkflowServiceStubs().getOptions(); - WorkflowServiceStubsOptions modifiedOptions = - WorkflowServiceStubsOptions.newBuilder(options).setMetricsScope(metricsScope).build(); - - this.clientStubs = WorkflowServiceStubs.newServiceStubs(modifiedOptions); - this.workflowClient = - WorkflowClient.newInstance(clientStubs, testWorkflowRule.getWorkflowClient().getOptions()); - - TAGS_NAMESPACE_QUEUE = + tagsNamespaceQueue = replaceTags(TAGS_NAMESPACE, MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue()); } @After public void tearDown() { - this.clientStubs.shutdownNow(); this.registry.close(); } @Test public void testSynchronousStartAndGetResult() throws InterruptedException { QuicklyCompletingWorkflow quicklyCompletingWorkflow = - workflowClient.newWorkflowStub( - QuicklyCompletingWorkflow.class, - WorkflowOptions.newBuilder() - .setTaskQueue(testWorkflowRule.getTaskQueue()) - .validateBuildWithDefaults()); + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + QuicklyCompletingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); quicklyCompletingWorkflow.execute(); - Thread.sleep(REPORTING_FLUSH_TIME); List startRequestTags = replaceTags( - TAGS_NAMESPACE_QUEUE, + tagsNamespaceQueue, MetricsTag.OPERATION_NAME, "StartWorkflowExecution", MetricsTag.WORKFLOW_TYPE, "QuicklyCompletingWorkflow"); - - assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_REQUEST, startRequestTags)); - List longPollRequestTags = replaceTag(TAGS_NAMESPACE, MetricsTag.OPERATION_NAME, "GetWorkflowExecutionHistory"); - assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_LONG_REQUEST, longPollRequestTags)); + assertEventually( + Duration.ofSeconds(2), + () -> { + assertIntCounter( + 1, registry.counter(MetricsType.TEMPORAL_LONG_REQUEST, longPollRequestTags)); + assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_REQUEST, startRequestTags)); + }); } @Test public void testAsynchronousStartAndGetResult() throws InterruptedException, ExecutionException { QuicklyCompletingWorkflow quicklyCompletingWorkflow = - workflowClient.newWorkflowStub( - QuicklyCompletingWorkflow.class, - WorkflowOptions.newBuilder() - .setTaskQueue(testWorkflowRule.getTaskQueue()) - .validateBuildWithDefaults()); + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + QuicklyCompletingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); WorkflowStub workflowStub = WorkflowStub.fromTyped(quicklyCompletingWorkflow); workflowStub.start(); workflowStub.getResultAsync(String.class).get(); - Thread.sleep(REPORTING_FLUSH_TIME); List startRequestTags = replaceTags( - TAGS_NAMESPACE_QUEUE, + tagsNamespaceQueue, MetricsTag.OPERATION_NAME, "StartWorkflowExecution", MetricsTag.WORKFLOW_TYPE, "QuicklyCompletingWorkflow"); - - assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_REQUEST, startRequestTags)); - List longPollRequestTags = replaceTag(TAGS_NAMESPACE, MetricsTag.OPERATION_NAME, "GetWorkflowExecutionHistory"); - assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_LONG_REQUEST, longPollRequestTags)); + assertEventually( + Duration.ofSeconds(2), + () -> { + assertIntCounter(1, registry.counter(MetricsType.TEMPORAL_REQUEST, startRequestTags)); + assertIntCounter( + 1, registry.counter(MetricsType.TEMPORAL_LONG_REQUEST, longPollRequestTags)); + }); + } + + @Test + public void testWorkflowSuccess() { + String result = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + MultiScenarioWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(MultiScenarioWorkflow.Scenario.SUCCESS); + assertEquals("success", result); + assertSingleMeterCountForMultiScenario( + io.temporal.worker.MetricsType.WORKFLOW_COMPLETED_COUNTER); + assertSingleMeterCountForMultiScenario(io.temporal.worker.MetricsType.WORKFLOW_E2E_LATENCY); + } + + @Test + public void testWorkflowFailure() { + try { + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + MultiScenarioWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(MultiScenarioWorkflow.Scenario.FAILURE); + fail(); + } catch (WorkflowFailedException e) { + assertTrue(e.getCause() instanceof ApplicationFailure); + } + assertSingleMeterCountForMultiScenario(io.temporal.worker.MetricsType.WORKFLOW_FAILED_COUNTER); + assertSingleMeterCountForMultiScenario(io.temporal.worker.MetricsType.WORKFLOW_E2E_LATENCY); + } + + @Test + public void testWorkflowCancel() { + WorkflowStub stub = + WorkflowStub.fromTyped( + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + MultiScenarioWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults())); + WorkflowExecution exec = stub.start(MultiScenarioWorkflow.Scenario.WAIT_FOR_CANCEL); + testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(exec.getWorkflowId()).cancel(); + try { + stub.getResult(String.class); + fail(); + } catch (WorkflowFailedException e) { + assertTrue(e.getCause() instanceof CanceledFailure); + } + assertSingleMeterCountForMultiScenario( + io.temporal.worker.MetricsType.WORKFLOW_CANCELED_COUNTER); + assertSingleMeterCountForMultiScenario(io.temporal.worker.MetricsType.WORKFLOW_E2E_LATENCY); + } + + @Test + public void testWorkflowContinueAsNew() { + String result = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + MultiScenarioWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(MultiScenarioWorkflow.Scenario.CONTINUE_AS_NEW); + assertEquals("success", result); + assertSingleMeterCountForMultiScenario( + io.temporal.worker.MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER); + assertSingleMeterCountForMultiScenario( + io.temporal.worker.MetricsType.WORKFLOW_COMPLETED_COUNTER); + // We cannot reliably check e2e latency here because it compares current ms with event start ms, + // and event start ms for the second workflow of a continue as new when time skipping can be + // flaky + } + + @Test + public void testUnhandledCommand() throws Exception { + // We're going to have a local activity send a signal to cause unhandled command + String workflowId = UUID.randomUUID().toString(); + runCallbackActivity.callbackOnNextRun.set( + () -> + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(workflowId) + .signal("some-signal")); + + // Run the workflow and confirm success + String result = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + MultiScenarioWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowId(workflowId) + .validateBuildWithDefaults()) + .execute(MultiScenarioWorkflow.Scenario.UNHANDLED_COMMAND); + assertEquals("success", result); + + // Confirm unhandled command in history + assertTrue( + testWorkflowRule + .getWorkflowClient() + .streamHistory(workflowId) + .anyMatch( + evt -> + evt.getWorkflowTaskFailedEventAttributes().getCause() + == WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND)); + + // Confirm we only got the one workflow completed. Before the code fixes that were added with + // this test, this would have returned multiple workflow completed counts. + assertSingleMeterCountForMultiScenario( + io.temporal.worker.MetricsType.WORKFLOW_COMPLETED_COUNTER); + } + + private void assertSingleMeterCountForMultiScenario(String metric) { + assertEventually( + Duration.ofSeconds(2), + () -> { + AtomicInteger compCount = new AtomicInteger(); + registry.forEachMeter( + meter -> { + if (metric.equals(meter.getId().getName()) + && "MultiScenarioWorkflow".equals(meter.getId().getTag("workflow_type"))) { + if (meter instanceof Counter) { + compCount.addAndGet((int) ((Counter) meter).count()); + } else if (meter instanceof Timer) { + compCount.addAndGet((int) ((Timer) meter).count()); + } + } + }); + assertEquals(1, compCount.get()); + }); } private static List replaceTags(List tags, String... nameValuePairs) { @@ -167,4 +308,64 @@ public String execute() { return "done"; } } + + @WorkflowInterface + public interface MultiScenarioWorkflow { + enum Scenario { + SUCCESS, + FAILURE, + WAIT_FOR_CANCEL, + CONTINUE_AS_NEW, + UNHANDLED_COMMAND + } + + @WorkflowMethod + String execute(Scenario scenario); + } + + public static class MultiScenarioWorkflowImpl implements MultiScenarioWorkflow { + @Override + public String execute(Scenario scenario) { + switch (scenario) { + case SUCCESS: + return "success"; + case FAILURE: + throw ApplicationFailure.newFailure("Intentional failure", "failure"); + case WAIT_FOR_CANCEL: + Workflow.await(() -> false); + throw new IllegalStateException("Unreachable"); + case CONTINUE_AS_NEW: + Workflow.continueAsNew(Scenario.SUCCESS); + throw new IllegalStateException("Unreachable"); + case UNHANDLED_COMMAND: + Workflow.newLocalActivityStub( + RunCallbackActivity.class, + LocalActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build()) + .runCallback(); + return "success"; + default: + throw new UnsupportedOperationException(); + } + } + } + + @ActivityInterface + public interface RunCallbackActivity { + @ActivityMethod + void runCallback(); + } + + public static class RunCallbackActivityImpl implements RunCallbackActivity { + private final AtomicReference callbackOnNextRun = new AtomicReference<>(); + + @Override + public void runCallback() { + Runnable toRun = callbackOnNextRun.getAndSet(null); + if (toRun != null) { + toRun.run(); + } + } + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java index 3a9e3aa8c..725a933a5 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java @@ -1,5 +1,6 @@ package io.temporal.internal.worker; +import static io.temporal.testUtils.Eventually.assertEventually; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -146,8 +147,11 @@ public void nonDeterminismIncrementsWorkflowFailedMetric() { triggerNonDeterministicException = true; workflow.unblock(); assertThrows(WorkflowFailedException.class, () -> workflow.workflow()); - reporter.assertCounter( - MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflowWithSignal"), 1); + assertEventually( + Duration.ofSeconds(2), + () -> + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflowWithSignal"), 1)); } @Test @@ -161,7 +165,11 @@ public void runtimeExceptionWorkflowFailedMetric() { .setTaskQueue(testWorkflowRule.getTaskQueue()) .validateBuildWithDefaults()); assertThrows(WorkflowFailedException.class, () -> workflow.workflow(true)); - reporter.assertCounter(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1); + assertEventually( + Duration.ofSeconds(2), + () -> + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1)); } @Test @@ -175,7 +183,11 @@ public void applicationFailureWorkflowFailedMetric() { .setTaskQueue(testWorkflowRule.getTaskQueue()) .validateBuildWithDefaults()); assertThrows(WorkflowFailedException.class, () -> workflow.workflow(false)); - reporter.assertCounter(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1); + assertEventually( + Duration.ofSeconds(2), + () -> + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1)); } @Test @@ -201,8 +213,13 @@ public void workflowFailureMetricBenignApplicationError() { assertFalse("Failure should not be benign", isBenign); assertEquals("Non-benign failure", ((TemporalFailure) cause1).getOriginalMessage()); - reporter.assertCounter( - MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("ApplicationFailureWorkflow"), 1); + assertEventually( + Duration.ofSeconds(2), + () -> + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, + getWorkflowTags("ApplicationFailureWorkflow"), + 1)); ApplicationFailureWorkflow benignStub = client.newWorkflowStub( @@ -229,7 +246,12 @@ public void workflowFailureMetricBenignApplicationError() { assertTrue("Failure should be benign", isBenign); assertEquals("Benign failure", ((TemporalFailure) cause2).getOriginalMessage()); - reporter.assertCounter( - MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("ApplicationFailureWorkflow"), 1); + assertEventually( + Duration.ofSeconds(2), + () -> + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, + getWorkflowTags("ApplicationFailureWorkflow"), + 1)); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index bc4597ad6..7888f6cf8 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -148,7 +148,8 @@ public void concurrentPollRequestLockTest() throws Exception { (id) -> { // verify the lock is still being held assertEquals(1, runLockManager.totalLocks()); - }); + }, + null); }); // Mock the server responding to a workflow task complete with another workflow task @@ -301,6 +302,7 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { null, null, false, + null, null); }); @@ -362,7 +364,8 @@ public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueRespon (id) -> { resetEventIdQueue.add(id); result.getResetEventIdHandle().apply(id); - }); + }, + null); } @Override