Skip to content

Commit a35281a

Browse files
committed
Add functional test for NotifyActivityTask dispatch
1 parent 8ca6401 commit a35281a

1 file changed

Lines changed: 148 additions & 0 deletions

File tree

tests/notify_activity_task_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
commandpb "go.temporal.io/api/command/v1"
9+
commonpb "go.temporal.io/api/common/v1"
10+
enumspb "go.temporal.io/api/enums/v1"
11+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
12+
"go.temporal.io/api/workflowservice/v1"
13+
"go.temporal.io/server/common/dynamicconfig"
14+
"go.temporal.io/server/tests/testcore"
15+
"google.golang.org/protobuf/types/known/durationpb"
16+
)
17+
18+
// TestDispatchCancelToWorker tests that when an activity cancellation is requested,
19+
// the server dispatches a NotifyActivityTask to the worker's control queue via Nexus.
20+
func TestDispatchCancelToWorker(t *testing.T) {
21+
env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableActivityCancellationNexusTask, true))
22+
23+
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
24+
defer cancel()
25+
26+
tv := env.Tv()
27+
poller := env.TaskPoller()
28+
29+
// Get the control queue name from test vars
30+
controlQueueName := tv.ControlQueueName(env.Namespace().String())
31+
t.Logf("WorkerInstanceKey: %s", tv.WorkerInstanceKey())
32+
t.Logf("ControlQueueName: %s", controlQueueName)
33+
34+
// Start the workflow
35+
startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
36+
RequestId: tv.Any().String(),
37+
Namespace: env.Namespace().String(),
38+
WorkflowId: tv.WorkflowID(),
39+
WorkflowType: tv.WorkflowType(),
40+
TaskQueue: tv.TaskQueue(),
41+
WorkflowExecutionTimeout: durationpb.New(60 * time.Second),
42+
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
43+
})
44+
env.NoError(err)
45+
t.Logf("Started workflow: %s/%s", tv.WorkflowID(), startResp.RunId)
46+
47+
// Poll and complete first workflow task - schedule the activity
48+
_, err = poller.PollAndHandleWorkflowTask(tv,
49+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
50+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
51+
Commands: []*commandpb.Command{
52+
{
53+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
54+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
55+
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
56+
ActivityId: tv.ActivityID(),
57+
ActivityType: tv.ActivityType(),
58+
TaskQueue: tv.TaskQueue(),
59+
ScheduleToCloseTimeout: durationpb.New(60 * time.Second),
60+
StartToCloseTimeout: durationpb.New(60 * time.Second),
61+
},
62+
},
63+
},
64+
},
65+
}, nil
66+
})
67+
env.NoError(err)
68+
t.Log("Scheduled activity")
69+
70+
// Poll for activity task and start running the activity.
71+
activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
72+
Namespace: env.Namespace().String(),
73+
TaskQueue: tv.TaskQueue(),
74+
Identity: tv.WorkerIdentity(),
75+
WorkerInstanceKey: tv.WorkerInstanceKey(),
76+
WorkerControlTaskQueue: controlQueueName,
77+
})
78+
env.NoError(err)
79+
env.NotNil(activityPollResp)
80+
env.NotEmpty(activityPollResp.TaskToken)
81+
t.Log("Activity started with WorkerInstanceKey")
82+
83+
// Request workflow cancellation
84+
t.Log("Requesting workflow cancellation...")
85+
_, err = env.FrontendClient().RequestCancelWorkflowExecution(ctx, &workflowservice.RequestCancelWorkflowExecutionRequest{
86+
Namespace: env.Namespace().String(),
87+
WorkflowExecution: &commonpb.WorkflowExecution{
88+
WorkflowId: tv.WorkflowID(),
89+
RunId: startResp.RunId,
90+
},
91+
})
92+
env.NoError(err)
93+
94+
// Simulate what the SDK does when a workflow is cancelled.
95+
// Poll and complete the workflow task with RequestCancelActivityTask command.
96+
// This sets CancelRequested=true and triggers the dispatch of NotifyActivityTask.
97+
_, err = poller.PollAndHandleWorkflowTask(tv,
98+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
99+
// Find the scheduled event ID
100+
var scheduledEventID int64
101+
for _, event := range task.History.Events {
102+
if event.EventType == enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED {
103+
scheduledEventID = event.EventId
104+
break
105+
}
106+
}
107+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
108+
Commands: []*commandpb.Command{
109+
{
110+
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
111+
Attributes: &commandpb.Command_RequestCancelActivityTaskCommandAttributes{
112+
RequestCancelActivityTaskCommandAttributes: &commandpb.RequestCancelActivityTaskCommandAttributes{
113+
ScheduledEventId: scheduledEventID,
114+
},
115+
},
116+
},
117+
},
118+
}, nil
119+
})
120+
env.NoError(err)
121+
t.Log("Workflow task completed with RequestCancelActivityTask command")
122+
123+
// Poll Nexus control queue until we receive the notification request
124+
var nexusPollResp *workflowservice.PollNexusTaskQueueResponse
125+
env.Eventually(func() bool {
126+
pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second)
127+
defer pollCancel()
128+
resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{
129+
Namespace: env.Namespace().String(),
130+
TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
131+
Identity: tv.WorkerIdentity(),
132+
})
133+
if err == nil && resp != nil && resp.Request != nil {
134+
nexusPollResp = resp
135+
return true
136+
}
137+
return false
138+
}, 120*time.Second, 100*time.Millisecond, "Timed out waiting for Nexus task")
139+
140+
// Verify we received the notification request on the control queue
141+
env.NotNil(nexusPollResp.Request, "Expected to receive Nexus request on control queue")
142+
143+
startOp := nexusPollResp.Request.GetStartOperation()
144+
env.NotNil(startOp, "Expected StartOperation in Nexus request")
145+
env.Equal("temporal.api.worker.v1.WorkerService", startOp.Service, "Expected WorkerService")
146+
env.Equal("notify-activity", startOp.Operation, "Expected notify-activity operation")
147+
t.Logf("SUCCESS: Received notify-activity Nexus request on control queue")
148+
}

0 commit comments

Comments
 (0)