diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 08c99e689d..fd59db8f91 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -841,7 +841,12 @@ var ( WithDescription("The amount of time it took to successfully send a task to the DLQ. This only records the"+ " latency of the final attempt to send the task to the DLQ, not the cumulative latency of all attempts."), ) - TaskDiscarded = NewCounterDef("task_errors_discarded") + TaskDiscarded = NewCounterDef("task_errors_discarded") + + WorkerCommandsDispatchSuccess = NewCounterDef("worker_commands_dispatch_success") + WorkerCommandsDispatchFailure = NewCounterDef("worker_commands_dispatch_failure") + WorkerCommandsDispatchNoPoller = NewCounterDef("worker_commands_dispatch_no_poller") + WorkerCommandsOperationFailure = NewCounterDef("worker_commands_operation_failure") TaskSkipped = NewCounterDef("task_skipped") TaskVersionMisMatch = NewCounterDef("task_errors_version_mismatch") TasksDependencyTaskNotCompleted = NewCounterDef("task_dependency_task_not_completed") diff --git a/common/nexus/matching_response.go b/common/nexus/matching_response.go new file mode 100644 index 0000000000..30db7b1a44 --- /dev/null +++ b/common/nexus/matching_response.go @@ -0,0 +1,123 @@ +package nexus + +import ( + "github.com/nexus-rpc/sdk-go/nexus" + enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" + nexuspb "go.temporal.io/api/nexus/v1" + matchingservice "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/nexus/nexusrpc" +) + +// StartOperationResponseToError converts a StartOperationResponse proto into a Nexus SDK error. +// Returns nil if the response indicates success (SyncSuccess or AsyncSuccess). +// +// Error types returned: +// - *nexus.HandlerError: internal errors (e.g., unexpected response variant) +// - *nexus.OperationError: operation-level failures from the handler +func StartOperationResponseToError(resp *nexuspb.StartOperationResponse) error { + switch t := resp.GetVariant().(type) { + case *nexuspb.StartOperationResponse_SyncSuccess: + return nil + case *nexuspb.StartOperationResponse_AsyncSuccess: + return nil + case *nexuspb.StartOperationResponse_OperationError: + //nolint:staticcheck // Deprecated variant still in use for backward compatibility. + opErr := &nexus.OperationError{ + Message: "operation error", + //nolint:staticcheck // Deprecated function still in use for backward compatibility. + State: nexus.OperationState(t.OperationError.GetOperationState()), + Cause: &nexus.FailureError{ + //nolint:staticcheck // Deprecated function still in use for backward compatibility. + Failure: ProtoFailureToNexusFailure(t.OperationError.GetFailure()), + }, + } + if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + return opErr + case *nexuspb.StartOperationResponse_Failure: + return operationErrorFromTemporalFailure(t.Failure) + default: + return nil + } +} + +// DispatchNexusTaskResponseToError converts a DispatchNexusTaskResponse proto into a Nexus SDK +// error. Returns nil if the response indicates success. +// +// This handles the outer dispatch envelope (timeout, handler error, failure) and delegates to +// StartOperationResponseToError for the inner StartOperationResponse. +// +// Error types returned: +// - *nexus.HandlerError: transport/handler failures, timeouts +// - *nexus.OperationError: operation-level failures from the worker +func DispatchNexusTaskResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { + switch t := resp.GetOutcome().(type) { + case *matchingservice.DispatchNexusTaskResponse_Failure: + return handlerErrorFromTemporalFailure(t.Failure) + case *matchingservice.DispatchNexusTaskResponse_HandlerError: + return handlerErrorFromDeprecatedProto(t.HandlerError) + case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") + case *matchingservice.DispatchNexusTaskResponse_Response: + return StartOperationResponseToError(t.Response.GetStartOperation()) + default: + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty outcome") + } +} + +func handlerErrorFromTemporalFailure(failure *failurepb.Failure) error { + nf, err := TemporalFailureToNexusFailure(failure) + if err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + he, err := nexusrpc.DefaultFailureConverter().FailureToError(nf) + if err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + return he +} + +func handlerErrorFromDeprecatedProto(he *nexuspb.HandlerError) *nexus.HandlerError { + var retryBehavior nexus.HandlerErrorRetryBehavior + //nolint:exhaustive // unspecified is the default + switch he.GetRetryBehavior() { + case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: + retryBehavior = nexus.HandlerErrorRetryBehaviorRetryable + case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: + retryBehavior = nexus.HandlerErrorRetryBehaviorNonRetryable + } + //nolint:staticcheck // Deprecated function still in use for backward compatibility. + cause := ProtoFailureToNexusFailure(he.GetFailure()) + return &nexus.HandlerError{ + //nolint:staticcheck // Deprecated function still in use for backward compatibility. + Type: nexus.HandlerErrorType(he.GetErrorType()), + RetryBehavior: retryBehavior, + Cause: &nexus.FailureError{Failure: cause}, + } +} + +func operationErrorFromTemporalFailure(failure *failurepb.Failure) error { + state := nexus.OperationStateFailed + if failure.GetCanceledFailureInfo() != nil { + state = nexus.OperationStateCanceled + } + nf, err := TemporalFailureToNexusFailure(failure) + if err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + cause, err := nexusrpc.DefaultFailureConverter().FailureToError(nf) + if err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + opErr := &nexus.OperationError{ + State: state, + Message: "operation error", + Cause: cause, + } + if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + } + return opErr +} diff --git a/go.mod b/go.mod index b217171341..428006efc4 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/lib/pq v1.10.9 github.com/maruel/panicparse/v2 v2.4.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5 + github.com/nexus-rpc/sdk-go v0.6.0 github.com/olekukonko/tablewriter v0.0.5 github.com/olivere/elastic/v7 v7.0.32 github.com/pkg/errors v0.9.1 @@ -174,4 +174,4 @@ require ( modernc.org/memory v1.11.0 // indirect ) -replace go.temporal.io/api => github.com/temporalio/api-go v1.62.2-0.20260313212811-d44912090759 +replace go.temporal.io/api => github.com/temporalio/api-go v1.62.2-0.20260314000959-bbb2a94130c3 diff --git a/go.sum b/go.sum index e9d2b357fa..bc007fa0c3 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5 h1:Van9KGGs8lcDgxzSNFbDhEMNeJ80TbBxwZ45f9iBk9U= -github.com/nexus-rpc/sdk-go v0.5.2-0.20260211051645-26b0b4c584e5/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= +github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= +github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -310,8 +310,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/temporalio/api-go v1.62.2-0.20260313212811-d44912090759 h1:CSlBGjKIgi770YWTYB1dt2AJuLKU6yArSZL636UStdo= -github.com/temporalio/api-go v1.62.2-0.20260313212811-d44912090759/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +github.com/temporalio/api-go v1.62.2-0.20260314000959-bbb2a94130c3 h1:8T6S/2+0jCL//uKnmwQsH7W+O1VnkonSMuzpfR+AZe8= +github.com/temporalio/api-go v1.62.2-0.20260314000959-bbb2a94130c3/go.mod h1:ucB3ZO5X2AFLJcUBzOrio08zxiQjuzdM/7aRKOEQPEc= github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7 h1:lEebX/hZss+TSH3EBwhztnBavJVj7pWGJOH8UgKHS0w= github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4= github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb h1:YzHH/U/dN7vMP+glybzcXRTczTrgfdRisNTzAj7La04= diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index 48432e45fb..c1b3591818 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -447,115 +447,42 @@ func (h *nexusHandler) StartOperation( oc.logger.Error("received error from matching service for Nexus StartOperation request", tag.Error(err)) return nil, commonnexus.ConvertGRPCError(err, false) } - // Convert to standard Nexus SDK response. - switch t := response.GetOutcome().(type) { - case *matchingservice.DispatchNexusTaskResponse_Failure: - // Set the failure source to "worker" if we've reached this case. - // Failure conversions errors below are the user's fault, as it implies that malformed completions were sent from - // the worker. + // Convert to standard Nexus SDK response and check for errors. + nexusErr := commonnexus.DispatchNexusTaskResponseToError(response) + if nexusErr != nil { oc.setFailureSource(commonnexus.FailureSourceWorker) - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.Failure.GetNexusHandlerFailureInfo().GetType())) - nf, err := commonnexus.TemporalFailureToNexusFailure(t.Failure) - if err != nil { - oc.logger.Error("error converting Temporal failure to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } - he, err := nexusrpc.DefaultFailureConverter().FailureToError(nf) - if err != nil { - oc.logger.Error("error converting Nexus failure to Nexus HandlerError", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") + oc.metricsHandler = oc.metricsHandler.WithTags(outcomeTagForNexusError(nexusErr)) + return nil, nexusErr + } + + // Success path: extract the result from the StartOperation response. + startOp := response.GetResponse().GetStartOperation() + switch t := startOp.GetVariant().(type) { + case *nexuspb.StartOperationResponse_SyncSuccess: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("sync_success")) + links := parseLinks(t.SyncSuccess.GetLinks(), oc.logger) + nexus.AddHandlerLinks(ctx, links...) + return &nexus.HandlerStartOperationResultSync[any]{ + Value: t.SyncSuccess.GetPayload(), + }, nil + + case *nexuspb.StartOperationResponse_AsyncSuccess: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("async_success")) + token := t.AsyncSuccess.GetOperationToken() + if token == "" { + token = t.AsyncSuccess.GetOperationId() } - return nil, he - - case *matchingservice.DispatchNexusTaskResponse_HandlerError: - // Deprecated case. Replaced with DispatchNexusTaskResponse_Failure - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:" + t.HandlerError.GetErrorType())) - oc.setFailureSource(commonnexus.FailureSourceWorker) - err := convertOutcomeToNexusHandlerError(t) - return nil, err - - case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_timeout")) + links := parseLinks(t.AsyncSuccess.GetLinks(), oc.logger) + nexus.AddHandlerLinks(ctx, links...) + return &nexus.HandlerStartOperationResultAsync{ + OperationToken: token, + }, nil + + default: + oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:EMPTY_OUTCOME")) oc.setFailureSource(commonnexus.FailureSourceWorker) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") - - case *matchingservice.DispatchNexusTaskResponse_Response: - switch t := t.Response.GetStartOperation().GetVariant().(type) { - case *nexuspb.StartOperationResponse_SyncSuccess: - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("sync_success")) - links := parseLinks(t.SyncSuccess.GetLinks(), oc.logger) - nexus.AddHandlerLinks(ctx, links...) - return &nexus.HandlerStartOperationResultSync[any]{ - Value: t.SyncSuccess.GetPayload(), - }, nil - - case *nexuspb.StartOperationResponse_AsyncSuccess: - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("async_success")) - token := t.AsyncSuccess.GetOperationToken() - if token == "" { - token = t.AsyncSuccess.GetOperationId() - } - links := parseLinks(t.AsyncSuccess.GetLinks(), oc.logger) - nexus.AddHandlerLinks(ctx, links...) - return &nexus.HandlerStartOperationResultAsync{ - OperationToken: token, - }, nil - - case *nexuspb.StartOperationResponse_OperationError: - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("operation_error")) - oc.setFailureSource(commonnexus.FailureSourceWorker) - opErr := &nexus.OperationError{ - Message: "operation error", - // nolint:staticcheck // Deprecated function still in use for backward compatibility. - State: nexus.OperationState(t.OperationError.GetOperationState()), - Cause: &nexus.FailureError{ - // nolint:staticcheck // Deprecated function still in use for backward compatibility. - Failure: commonnexus.ProtoFailureToNexusFailure(t.OperationError.GetFailure()), - }, - } - if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { - oc.logger.Error("error converting OperationError to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } - return nil, opErr - - case *nexuspb.StartOperationResponse_Failure: - // Set the failure source to "worker" if we've reached this case. - // Failure conversions errors below are the user's fault, as it implies that malformed completions were sent from - // the worker. - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("failure")) - oc.setFailureSource(commonnexus.FailureSourceWorker) - nf, err := commonnexus.TemporalFailureToNexusFailure(t.Failure) - if err != nil { - oc.logger.Error("error converting Temporal failure to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } - cause, err := nexusrpc.DefaultFailureConverter().FailureToError(nf) - if err != nil { - oc.logger.Error("error converting Nexus failure to Nexus OperationError", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } - state := nexus.OperationStateFailed - if t.Failure.GetCanceledFailureInfo() != nil { - state = nexus.OperationStateCanceled - } - opErr := &nexus.OperationError{ - State: state, - Message: "operation error", - Cause: cause, - } - if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { - oc.logger.Error("error converting OperationError to Nexus failure", tag.Error(err), tag.Operation(operation), tag.WorkflowNamespace(oc.namespaceName)) - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") - } - return nil, opErr - } + return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty outcome") } - // This is the worker's fault. - oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_error:EMPTY_OUTCOME")) - oc.setFailureSource(commonnexus.FailureSourceWorker) - - return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty outcome") } func parseLinks(links []*nexuspb.Link, logger log.Logger) []nexus.Link { @@ -819,6 +746,22 @@ func convertOutcomeToNexusHandlerError(resp *matchingservice.DispatchNexusTaskRe } } +// outcomeTagForNexusError returns a metrics OutcomeTag based on the Nexus SDK error type. +func outcomeTagForNexusError(nexusErr error) metrics.Tag { + var handlerErr *nexus.HandlerError + if errors.As(nexusErr, &handlerErr) { + if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout { + return metrics.OutcomeTag("handler_timeout") + } + return metrics.OutcomeTag("handler_error:" + string(handlerErr.Type)) + } + var opErr *nexus.OperationError + if errors.As(nexusErr, &opErr) { + return metrics.OutcomeTag("operation_error") + } + return metrics.OutcomeTag("handler_error:UNKNOWN") +} + func (nc *nexusContext) setFailureSource(source string) { nc.responseHeadersMutex.Lock() defer nc.responseHeadersMutex.Unlock() diff --git a/service/history/outbound_queue_active_task_executor.go b/service/history/outbound_queue_active_task_executor.go index bc3af18822..241a127110 100644 --- a/service/history/outbound_queue_active_task_executor.go +++ b/service/history/outbound_queue_active_task_executor.go @@ -10,6 +10,8 @@ import ( "go.temporal.io/server/common/debug" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/queues" @@ -24,7 +26,8 @@ const ( type outboundQueueActiveTaskExecutor struct { stateMachineEnvironment - chasmEngine chasm.Engine + chasmEngine chasm.Engine + workerCommandsTaskDispatcher *workerCommandsTaskDispatcher } var _ queues.Executor = &outboundQueueActiveTaskExecutor{} @@ -35,17 +38,26 @@ func newOutboundQueueActiveTaskExecutor( logger log.Logger, metricsHandler metrics.Handler, chasmEngine chasm.Engine, + matchingRawClient resource.MatchingRawClient, + config *configs.Config, ) *outboundQueueActiveTaskExecutor { + scopedMetricsHandler := metricsHandler.WithTags( + metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope), + ) return &outboundQueueActiveTaskExecutor{ stateMachineEnvironment: stateMachineEnvironment{ - shardContext: shardCtx, - cache: workflowCache, - logger: logger, - metricsHandler: metricsHandler.WithTags( - metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope), - ), + shardContext: shardCtx, + cache: workflowCache, + logger: logger, + metricsHandler: scopedMetricsHandler, }, chasmEngine: chasmEngine, + workerCommandsTaskDispatcher: newWorkerCommandsTaskDispatcher( + matchingRawClient, + config, + scopedMetricsHandler, + logger, + ), } } @@ -92,6 +104,8 @@ func (e *outboundQueueActiveTaskExecutor) Execute( return respond(e.executeStateMachineTask(ctx, task)) case *tasks.ChasmTask: return respond(e.executeChasmSideEffectTask(ctx, task)) + case *tasks.WorkerCommandsTask: + return respond(e.workerCommandsTaskDispatcher.execute(ctx, task)) } return respond(queueserrors.NewUnprocessableTaskError(fmt.Sprintf("unknown task type '%T'", task))) diff --git a/service/history/outbound_queue_active_task_executor_test.go b/service/history/outbound_queue_active_task_executor_test.go index 511b4e2810..dfcc860973 100644 --- a/service/history/outbound_queue_active_task_executor_test.go +++ b/service/history/outbound_queue_active_task_executor_test.go @@ -115,6 +115,8 @@ func (s *outboundQueueActiveTaskExecutorSuite) SetupTest() { s.logger, s.metricsHandler, s.mockChasmEngine, + nil, // matchingRawClient - not used in these tests + nil, // config - not used in these tests ) } diff --git a/service/history/outbound_queue_factory.go b/service/history/outbound_queue_factory.go index 77dffccef6..9dcd094405 100644 --- a/service/history/outbound_queue_factory.go +++ b/service/history/outbound_queue_factory.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/resource" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/common/telemetry" "go.temporal.io/server/service/history/circuitbreakerpool" @@ -31,6 +32,7 @@ type outboundQueueFactoryParams struct { QueueFactoryBaseParams CircuitBreakerPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool + MatchingRawClient resource.MatchingRawClient } type groupLimiter struct { @@ -227,6 +229,8 @@ func (f *outboundQueueFactory) CreateQueue( logger, metricsHandler, f.ChasmEngine, + f.MatchingRawClient, + shardContext.GetConfig(), ) standbyExecutor := newOutboundQueueStandbyTaskExecutor( diff --git a/service/history/worker_commands_task_dispatcher.go b/service/history/worker_commands_task_dispatcher.go new file mode 100644 index 0000000000..c0c3a39ac6 --- /dev/null +++ b/service/history/worker_commands_task_dispatcher.go @@ -0,0 +1,168 @@ +package history + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nexus-rpc/sdk-go/nexus" + enumspb "go.temporal.io/api/enums/v1" + nexuspb "go.temporal.io/api/nexus/v1" + workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/debug" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + commonnexus "go.temporal.io/server/common/nexus" + "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/tasks" +) + +const ( + workerCommandsTaskTimeout = time.Second * 10 * debug.TimeoutMultiplier +) + +// workerCommandsTaskDispatcher dispatches worker commands to workers via Nexus. +// +// Failure scenarios: +// - No worker polling: matching returns RequestTimeout → *nexus.HandlerError{Type: UpstreamTimeout}. +// Retryable — worker may come up later. +// - Worker crashes after receiving the task: matching blocks waiting for a response until +// context deadline, then returns RequestTimeout. Indistinguishable from "no worker polling". +// Safe to retry because commands are idempotent (e.g., cancelling a missing activity is a +// no-op success per the worker contract). +// - Transport/RPC failure: *nexus.HandlerError. Retryable. +// - Operation failure (worker explicitly returns error): *nexus.OperationError. Permanent — +// the worker contract requires success for all defined commands, so this indicates a bug +// or version incompatibility. +// +// TODO: Add a worker-commands-specific retry cap (e.g., 5 attempts) instead of relying on the +// shared outbound queue HistoryTaskDLQUnexpectedErrorAttempts (default 70). These commands are +// best-effort with heartbeat timeout as fallback, so excessive retries waste resources. +type workerCommandsTaskDispatcher struct { + matchingRawClient resource.MatchingRawClient + config *configs.Config + metricsHandler metrics.Handler + logger log.Logger +} + +func newWorkerCommandsTaskDispatcher( + matchingRawClient resource.MatchingRawClient, + config *configs.Config, + metricsHandler metrics.Handler, + logger log.Logger, +) *workerCommandsTaskDispatcher { + return &workerCommandsTaskDispatcher{ + matchingRawClient: matchingRawClient, + config: config, + metricsHandler: metricsHandler, + logger: logger, + } +} + +func (d *workerCommandsTaskDispatcher) execute( + ctx context.Context, + task *tasks.WorkerCommandsTask, +) error { + if !d.config.EnableCancelActivityWorkerCommand() { + return nil + } + + if len(task.Commands) == 0 { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, workerCommandsTaskTimeout) + defer cancel() + + return d.dispatchToWorker(ctx, task) +} + +func (d *workerCommandsTaskDispatcher) dispatchToWorker( + ctx context.Context, + task *tasks.WorkerCommandsTask, +) error { + request := &workerservicepb.ExecuteCommandsRequest{ + Commands: task.Commands, + } + requestPayload, err := payload.Encode(request) + if err != nil { + return fmt.Errorf("failed to encode worker commands request: %w", err) + } + + nexusRequest := &nexuspb.Request{ + Header: map[string]string{}, + Variant: &nexuspb.Request_StartOperation{ + StartOperation: &nexuspb.StartOperationRequest{ + Service: workerservicepb.WorkerService.ServiceName, + Operation: workerservicepb.WorkerService.ExecuteCommands.Name(), + Payload: requestPayload, + }, + }, + } + + resp, err := d.matchingRawClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{ + NamespaceId: task.NamespaceID, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: task.Destination, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Request: nexusRequest, + }) + if err != nil { + d.logger.Warn("Failed to dispatch worker commands", + tag.NewStringTag("control_queue", task.Destination), + tag.Error(err)) + metrics.WorkerCommandsDispatchFailure.With(d.metricsHandler).Record(1) + return err + } + + nexusErr := commonnexus.DispatchNexusTaskResponseToError(resp) + if nexusErr == nil { + metrics.WorkerCommandsDispatchSuccess.With(d.metricsHandler).Record(1) + return nil + } + + return d.handleError(nexusErr, task.Destination) +} + +func (d *workerCommandsTaskDispatcher) handleError(nexusErr error, controlQueue string) error { + var opErr *nexus.OperationError + if errors.As(nexusErr, &opErr) { + // Operation-level failure: the worker received and processed the request but returned + // an error. Permanent — the worker contract requires success for all defined commands, + // so this indicates a bug or version incompatibility. Retrying won't help. + d.logger.Error("Worker returned operation failure for worker commands", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsOperationFailure.With(d.metricsHandler).Record(1) + return nil + } + + var handlerErr *nexus.HandlerError + if errors.As(nexusErr, &handlerErr) { + if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout { + d.logger.Warn("No worker polling control queue", + tag.NewStringTag("control_queue", controlQueue)) + metrics.WorkerCommandsDispatchNoPoller.With(d.metricsHandler).Record(1) + return nexusErr + } + + d.logger.Warn("Worker commands transport failure", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsDispatchFailure.With(d.metricsHandler).Record(1) + return nexusErr + } + + d.logger.Warn("Worker commands unexpected error", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsDispatchFailure.With(d.metricsHandler).Record(1) + return nexusErr +} diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go new file mode 100644 index 0000000000..835537c742 --- /dev/null +++ b/tests/worker_commands_task_test.go @@ -0,0 +1,149 @@ +package tests + +import ( + "context" + "testing" + "time" + + commandpb "go.temporal.io/api/command/v1" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/tests/testcore" + "google.golang.org/protobuf/types/known/durationpb" +) + +// TestDispatchCancelToWorker tests that when an activity cancellation is requested, +// the server dispatches an ActivityCommandTask to the worker's control queue via Nexus. +func TestDispatchCancelToWorker(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + // Get the control queue name from test vars + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + t.Logf("WorkerInstanceKey: %s", tv.WorkerInstanceKey()) + t.Logf("ControlQueueName: %s", controlQueueName) + + // Start the workflow + startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + t.Logf("Started workflow: %s/%s", tv.WorkflowID(), startResp.RunId) + + // Poll and complete first workflow task - schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + ScheduleToCloseTimeout: durationpb.New(60 * time.Second), + StartToCloseTimeout: durationpb.New(60 * time.Second), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + t.Log("Scheduled activity") + + // Poll for activity task and start running the activity. + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + t.Log("Activity started with WorkerInstanceKey") + + // Request workflow cancellation + t.Log("Requesting workflow cancellation...") + _, err = env.FrontendClient().RequestCancelWorkflowExecution(ctx, &workflowservice.RequestCancelWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: startResp.RunId, + }, + }) + env.NoError(err) + + // Simulate what the SDK does when a workflow is cancelled. + // Poll and complete the workflow task with RequestCancelActivityTask command. + // This sets CancelRequested=true and triggers the dispatch of ActivityCommandTask. + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + // Find the scheduled event ID + var scheduledEventID int64 + for _, event := range task.History.Events { + if event.EventType == enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED { + scheduledEventID = event.EventId + break + } + } + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK, + Attributes: &commandpb.Command_RequestCancelActivityTaskCommandAttributes{ + RequestCancelActivityTaskCommandAttributes: &commandpb.RequestCancelActivityTaskCommandAttributes{ + ScheduledEventId: scheduledEventID, + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + t.Log("Workflow task completed with RequestCancelActivityTask command") + + // Poll Nexus control queue until we receive the notification request + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for Nexus task") + + // Verify we received the notification request on the control queue + env.NotNil(nexusPollResp.Request, "Expected to receive Nexus request on control queue") + + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal(workerservicepb.WorkerService.ServiceName, startOp.Service, "Expected WorkerService") + env.Equal(workerservicepb.WorkerService.ExecuteCommands.Name(), startOp.Operation, "Expected ExecuteCommands operation") + t.Log("SUCCESS: Received ExecuteCommands Nexus request on control queue") +}