Skip to content

Commit d8f33bf

Browse files
committed
Add ActivityCommandTask dispatch via Nexus
- Add activityCommandTaskDispatcher to dispatch tasks to workers - Use typed Nexus service definition for worker communication - Add proper error handling for DispatchNexusTask response - Release workflow lock before making RPC call - Add functional test for dispatch flow
1 parent d6af322 commit d8f33bf

5 files changed

Lines changed: 396 additions & 7 deletions
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package history
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
enumspb "go.temporal.io/api/enums/v1"
9+
nexuspb "go.temporal.io/api/nexus/v1"
10+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
11+
workerpb "go.temporal.io/api/worker/v1"
12+
enumsspb "go.temporal.io/server/api/enums/v1"
13+
"go.temporal.io/server/api/matchingservice/v1"
14+
tokenspb "go.temporal.io/server/api/token/v1"
15+
"go.temporal.io/server/common"
16+
"go.temporal.io/server/common/debug"
17+
"go.temporal.io/server/common/log"
18+
"go.temporal.io/server/common/log/tag"
19+
"go.temporal.io/server/common/metrics"
20+
"go.temporal.io/server/common/payload"
21+
"go.temporal.io/server/common/resource"
22+
"go.temporal.io/server/service/history/configs"
23+
historyi "go.temporal.io/server/service/history/interfaces"
24+
"go.temporal.io/server/service/history/tasks"
25+
wcache "go.temporal.io/server/service/history/workflow/cache"
26+
)
27+
28+
const (
29+
activityCommandTaskTimeout = time.Second * 10 * debug.TimeoutMultiplier
30+
)
31+
32+
// activityCommandTaskDispatcher handles dispatching activity command tasks to workers.
33+
type activityCommandTaskDispatcher struct {
34+
shardContext historyi.ShardContext
35+
cache wcache.Cache
36+
matchingRawClient resource.MatchingRawClient
37+
config *configs.Config
38+
metricsHandler metrics.Handler
39+
logger log.Logger
40+
}
41+
42+
func newActivityCommandTaskDispatcher(
43+
shardContext historyi.ShardContext,
44+
cache wcache.Cache,
45+
matchingRawClient resource.MatchingRawClient,
46+
config *configs.Config,
47+
metricsHandler metrics.Handler,
48+
logger log.Logger,
49+
) *activityCommandTaskDispatcher {
50+
return &activityCommandTaskDispatcher{
51+
shardContext: shardContext,
52+
cache: cache,
53+
matchingRawClient: matchingRawClient,
54+
config: config,
55+
metricsHandler: metricsHandler,
56+
logger: logger,
57+
}
58+
}
59+
60+
func (d *activityCommandTaskDispatcher) execute(
61+
ctx context.Context,
62+
task *tasks.ActivityCommandTask,
63+
) error {
64+
if !d.config.EnableActivityCancellationNexusTask() {
65+
return nil
66+
}
67+
68+
if len(task.ScheduledEventIDs) == 0 {
69+
return nil
70+
}
71+
72+
ctx, cancel := context.WithTimeout(ctx, activityCommandTaskTimeout)
73+
defer cancel()
74+
75+
taskTokens, err := d.buildTaskTokens(ctx, task)
76+
if err != nil {
77+
return err
78+
}
79+
if len(taskTokens) == 0 {
80+
return nil
81+
}
82+
83+
return d.dispatchToWorker(ctx, task, taskTokens)
84+
}
85+
86+
// buildTaskTokens loads mutable state and builds task tokens for activities that need commands.
87+
// Lock is acquired and released within this method.
88+
func (d *activityCommandTaskDispatcher) buildTaskTokens(
89+
ctx context.Context,
90+
task *tasks.ActivityCommandTask,
91+
) ([][]byte, error) {
92+
weContext, release, err := getWorkflowExecutionContextForTask(ctx, d.shardContext, d.cache, task)
93+
if err != nil {
94+
return nil, err
95+
}
96+
defer release(nil)
97+
98+
mutableState, err := weContext.LoadMutableState(ctx, d.shardContext)
99+
if err != nil {
100+
return nil, err
101+
}
102+
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
103+
return nil, nil
104+
}
105+
106+
var taskTokens [][]byte
107+
for _, scheduledEventID := range task.ScheduledEventIDs {
108+
ai, ok := mutableState.GetActivityInfo(scheduledEventID)
109+
if !ok || ai.StartedEventId == common.EmptyEventID {
110+
continue
111+
}
112+
if task.CommandType == enumsspb.ACTIVITY_COMMAND_TYPE_CANCEL && !ai.CancelRequested {
113+
continue
114+
}
115+
116+
taskToken := &tokenspb.Task{
117+
NamespaceId: task.NamespaceID,
118+
WorkflowId: task.WorkflowID,
119+
RunId: task.RunID,
120+
ScheduledEventId: scheduledEventID,
121+
Attempt: ai.Attempt,
122+
ActivityId: ai.ActivityId,
123+
StartedEventId: ai.StartedEventId,
124+
Version: ai.Version,
125+
}
126+
taskTokenBytes, err := taskToken.Marshal()
127+
if err != nil {
128+
return nil, err
129+
}
130+
taskTokens = append(taskTokens, taskTokenBytes)
131+
}
132+
return taskTokens, nil
133+
}
134+
135+
func (d *activityCommandTaskDispatcher) dispatchToWorker(
136+
ctx context.Context,
137+
task *tasks.ActivityCommandTask,
138+
taskTokens [][]byte,
139+
) error {
140+
notificationRequest := &workerpb.ActivityNotificationRequest{
141+
NotificationType: workerpb.ActivityNotificationType(task.CommandType),
142+
TaskTokens: taskTokens,
143+
}
144+
requestPayload, err := payload.Encode(notificationRequest)
145+
if err != nil {
146+
return fmt.Errorf("failed to encode activity command request: %w", err)
147+
}
148+
149+
nexusRequest := &nexuspb.Request{
150+
Header: map[string]string{},
151+
Variant: &nexuspb.Request_StartOperation{
152+
StartOperation: &nexuspb.StartOperationRequest{
153+
Service: workerpb.WorkerService.ServiceName,
154+
Operation: workerpb.WorkerService.NotifyActivity.Name(),
155+
Payload: requestPayload,
156+
},
157+
},
158+
}
159+
160+
resp, err := d.matchingRawClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{
161+
NamespaceId: task.NamespaceID,
162+
TaskQueue: &taskqueuepb.TaskQueue{
163+
Name: task.Destination,
164+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
165+
},
166+
Request: nexusRequest,
167+
})
168+
if err != nil {
169+
d.logger.Warn("Failed to dispatch activity command to worker",
170+
tag.NewStringTag("control_queue", task.Destination),
171+
tag.Error(err))
172+
return err
173+
}
174+
175+
return d.handleDispatchResponse(resp, task.Destination)
176+
}
177+
178+
func (d *activityCommandTaskDispatcher) handleDispatchResponse(
179+
resp *matchingservice.DispatchNexusTaskResponse,
180+
controlQueue string,
181+
) error {
182+
// Check for timeout (no worker polling)
183+
if resp.GetRequestTimeout() != nil {
184+
d.logger.Warn("No worker polling control queue for activity command",
185+
tag.NewStringTag("control_queue", controlQueue))
186+
return fmt.Errorf("no worker polling control queue")
187+
}
188+
189+
// Check for worker handler failure
190+
if failure := resp.GetFailure(); failure != nil {
191+
d.logger.Warn("Worker handler failed for activity command",
192+
tag.NewStringTag("control_queue", controlQueue),
193+
tag.NewStringTag("failure_message", failure.GetMessage()))
194+
return fmt.Errorf("worker handler failed: %s", failure.GetMessage())
195+
}
196+
197+
// Check operation-level response
198+
nexusResp := resp.GetResponse()
199+
if nexusResp == nil {
200+
return nil
201+
}
202+
203+
startOpResp := nexusResp.GetStartOperation()
204+
if startOpResp == nil {
205+
return nil
206+
}
207+
208+
// Check for operation failure (terminal - don't retry)
209+
if opFailure := startOpResp.GetFailure(); opFailure != nil {
210+
d.logger.Warn("Activity command operation failure",
211+
tag.NewStringTag("control_queue", controlQueue),
212+
tag.NewStringTag("failure_message", opFailure.GetMessage()))
213+
return nil
214+
}
215+
216+
return nil
217+
}
218+

service/history/outbound_queue_active_task_executor.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"go.temporal.io/server/common/debug"
1111
"go.temporal.io/server/common/log"
1212
"go.temporal.io/server/common/metrics"
13+
"go.temporal.io/server/common/resource"
14+
"go.temporal.io/server/service/history/configs"
1315
"go.temporal.io/server/service/history/consts"
1416
historyi "go.temporal.io/server/service/history/interfaces"
1517
"go.temporal.io/server/service/history/queues"
@@ -24,7 +26,8 @@ const (
2426

2527
type outboundQueueActiveTaskExecutor struct {
2628
stateMachineEnvironment
27-
chasmEngine chasm.Engine
29+
chasmEngine chasm.Engine
30+
activityCommandTaskDispatcher *activityCommandTaskDispatcher
2831
}
2932

3033
var _ queues.Executor = &outboundQueueActiveTaskExecutor{}
@@ -35,17 +38,28 @@ func newOutboundQueueActiveTaskExecutor(
3538
logger log.Logger,
3639
metricsHandler metrics.Handler,
3740
chasmEngine chasm.Engine,
41+
matchingRawClient resource.MatchingRawClient,
42+
config *configs.Config,
3843
) *outboundQueueActiveTaskExecutor {
44+
scopedMetricsHandler := metricsHandler.WithTags(
45+
metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope),
46+
)
3947
return &outboundQueueActiveTaskExecutor{
4048
stateMachineEnvironment: stateMachineEnvironment{
41-
shardContext: shardCtx,
42-
cache: workflowCache,
43-
logger: logger,
44-
metricsHandler: metricsHandler.WithTags(
45-
metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope),
46-
),
49+
shardContext: shardCtx,
50+
cache: workflowCache,
51+
logger: logger,
52+
metricsHandler: scopedMetricsHandler,
4753
},
4854
chasmEngine: chasmEngine,
55+
activityCommandTaskDispatcher: newActivityCommandTaskDispatcher(
56+
shardCtx,
57+
workflowCache,
58+
matchingRawClient,
59+
config,
60+
scopedMetricsHandler,
61+
logger,
62+
),
4963
}
5064
}
5165

@@ -92,6 +106,8 @@ func (e *outboundQueueActiveTaskExecutor) Execute(
92106
return respond(e.executeStateMachineTask(ctx, task))
93107
case *tasks.ChasmTask:
94108
return respond(e.executeChasmSideEffectTask(ctx, task))
109+
case *tasks.ActivityCommandTask:
110+
return respond(e.activityCommandTaskDispatcher.execute(ctx, task))
95111
}
96112

97113
return respond(queueserrors.NewUnprocessableTaskError(fmt.Sprintf("unknown task type '%T'", task)))

service/history/outbound_queue_active_task_executor_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func (s *outboundQueueActiveTaskExecutorSuite) SetupTest() {
115115
s.logger,
116116
s.metricsHandler,
117117
s.mockChasmEngine,
118+
nil, // matchingRawClient - not used in these tests
119+
nil, // config - not used in these tests
118120
)
119121
}
120122

service/history/outbound_queue_factory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.temporal.io/server/common/metrics"
1111
"go.temporal.io/server/common/namespace"
1212
"go.temporal.io/server/common/quotas"
13+
"go.temporal.io/server/common/resource"
1314
ctasks "go.temporal.io/server/common/tasks"
1415
"go.temporal.io/server/common/telemetry"
1516
"go.temporal.io/server/service/history/circuitbreakerpool"
@@ -31,6 +32,7 @@ type outboundQueueFactoryParams struct {
3132

3233
QueueFactoryBaseParams
3334
CircuitBreakerPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool
35+
MatchingRawClient resource.MatchingRawClient
3436
}
3537

3638
type groupLimiter struct {
@@ -227,6 +229,8 @@ func (f *outboundQueueFactory) CreateQueue(
227229
logger,
228230
metricsHandler,
229231
f.ChasmEngine,
232+
f.MatchingRawClient,
233+
shardContext.GetConfig(),
230234
)
231235

232236
standbyExecutor := newOutboundQueueStandbyTaskExecutor(

0 commit comments

Comments
 (0)