Skip to content

Commit 13d8513

Browse files
committed
Add functional test for activity cancellation via control queue
- Add WorkerInstanceKey() and ControlQueueName() to testvars - Auto-populate WorkerInstanceKey in taskpoller - Fix nil Nexus header panic in matching engine - Fix WorkerInstanceKey not forwarded between task queue partitions - Add TestActivityCancelControlTaskDispatch functional test
1 parent 6b0f6ee commit 13d8513

7 files changed

Lines changed: 188 additions & 2 deletions

File tree

common/testing/taskpoller/taskpoller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,9 @@ func (p *activityTaskPoller) pollActivityTask(
557557
if req.Identity == "" {
558558
req.Identity = opts.tv.WorkerIdentity()
559559
}
560+
if req.WorkerInstanceKey == "" {
561+
req.WorkerInstanceKey = opts.tv.WorkerInstanceKey()
562+
}
560563
resp, err := p.client.PollActivityTaskQueue(ctx, req)
561564
if err != nil {
562565
return nil, err

common/testing/testvars/test_vars.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,15 @@ func (tv *TestVars) WorkerIdentity() string {
396396
return getOrCreate(tv, "worker_identity", tv.uniqueString, tv.stringNSetter)
397397
}
398398

399+
func (tv *TestVars) WorkerInstanceKey() string {
400+
return getOrCreate(tv, "worker_instance_key", tv.uniqueString, tv.stringNSetter)
401+
}
402+
403+
// ControlQueueName returns the queue name used to deliver Nexus tasks to this worker instance.
404+
func (tv *TestVars) ControlQueueName(namespace string) string {
405+
return fmt.Sprintf("/temporal-sys/worker-commands/%s/%s-nexus-queue", namespace, tv.WorkerInstanceKey())
406+
}
407+
399408
func (tv *TestVars) TimerID() string {
400409
return getOrCreate(tv, "timer_id", tv.uniqueString, tv.stringNSetter)
401410
}

service/history/transfer_queue_active_task_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66

77
"github.com/google/uuid"
8-
"google.golang.org/protobuf/proto"
98
commonpb "go.temporal.io/api/common/v1"
109
deploymentpb "go.temporal.io/api/deployment/v1"
1110
enumspb "go.temporal.io/api/enums/v1"
@@ -51,6 +50,7 @@ import (
5150
"go.temporal.io/server/service/history/workflow"
5251
wcache "go.temporal.io/server/service/history/workflow/cache"
5352
"go.temporal.io/server/service/worker/parentclosepolicy"
53+
"google.golang.org/protobuf/proto"
5454
)
5555

5656
type (
@@ -2044,7 +2044,7 @@ func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20442044
return err
20452045
}
20462046
// TODO: Fetch control queue name from worker registry.
2047-
controlQueueName := fmt.Sprintf("/temporal-sys/worker-commands/%s/%s", nsName, task.WorkerInstanceKey)
2047+
controlQueueName := fmt.Sprintf("/temporal-sys/worker-commands/%s/%s-nexus-queue", nsName, task.WorkerInstanceKey)
20482048

20492049
cancelPayload := &workerpb.CancelActivitiesRequestPayload{
20502050
TaskTokens: taskTokens,

service/matching/forwarder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada
271271
TaskQueueMetadata: pollMetadata.taskQueueMetadata,
272272
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
273273
DeploymentOptions: pollMetadata.deploymentOptions,
274+
WorkerInstanceKey: pollMetadata.workerInstanceKey,
274275
},
275276
ForwardedSource: fwdr.partition.RpcName(),
276277
Conditions: pollMetadata.conditions,

service/matching/matching_engine.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type (
9494
conditions *matchingservice.PollConditions
9595
forwardedFrom string
9696
localPollStartTime time.Time
97+
workerInstanceKey string
9798
}
9899

99100
userDataUpdate struct {
@@ -942,6 +943,7 @@ pollLoop:
942943
deploymentOptions: request.DeploymentOptions,
943944
forwardedFrom: req.ForwardedSource,
944945
conditions: req.Conditions,
946+
workerInstanceKey: request.WorkerInstanceKey,
945947
}
946948
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
947949
if err != nil {
@@ -2522,6 +2524,9 @@ pollLoop:
25222524
serializedToken, _ := e.tokenSerializer.SerializeNexusTaskToken(taskToken)
25232525

25242526
nexusReq := task.nexus.request.GetRequest()
2527+
if nexusReq.Header == nil {
2528+
nexusReq.Header = make(map[string]string)
2529+
}
25252530
nexusReq.Header[nexus.HeaderRequestTimeout] = time.Until(task.nexus.deadline).String()
25262531
// Java SDK currently expects the header in this form. We should be able to remove this duplication sometime mid 2025.
25272532
nexusReq.Header["Request-Timeout"] = time.Until(task.nexus.deadline).String()

service/matching/pri_forwarder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ func ForwardPollWithTarget(
241241
TaskQueueMetadata: pollMetadata.taskQueueMetadata,
242242
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
243243
DeploymentOptions: pollMetadata.deploymentOptions,
244+
WorkerInstanceKey: pollMetadata.workerInstanceKey,
244245
},
245246
ForwardedSource: source.RpcName(),
246247
Conditions: pollMetadata.conditions,
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/suite"
9+
commandpb "go.temporal.io/api/command/v1"
10+
commonpb "go.temporal.io/api/common/v1"
11+
enumspb "go.temporal.io/api/enums/v1"
12+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
13+
"go.temporal.io/api/workflowservice/v1"
14+
"go.temporal.io/server/common/dynamicconfig"
15+
"go.temporal.io/server/common/testing/taskpoller"
16+
"go.temporal.io/server/common/testing/testvars"
17+
"go.temporal.io/server/tests/testcore"
18+
"google.golang.org/protobuf/types/known/durationpb"
19+
)
20+
21+
type CancelRunningActivitiesUsingNexusTaskSuite struct {
22+
testcore.FunctionalTestBase
23+
}
24+
25+
func TestCancelRunningActivitiesUsingNexusTaskSuite(t *testing.T) {
26+
t.Parallel()
27+
suite.Run(t, new(CancelRunningActivitiesUsingNexusTaskSuite))
28+
}
29+
30+
// Tests that when a workflow is cancelled, all running activities are also cancelled.
31+
// The cancellation request should be delivered to the worker's control queue via the Nexus service.
32+
//
33+
// - Start a workflow that schedules a long running activity.
34+
// - Poll the activity task queue and start running the activity.
35+
// - Poll the control task queue and wait for the cancel request to be delivered.
36+
// - Request the workflow to be cancelled.
37+
// - Verify that the cancel request was delivered to the control queue.
38+
func (s *CancelRunningActivitiesUsingNexusTaskSuite) TestDispatchCancelToWorker() {
39+
// Enable the feature
40+
cleanup := s.OverrideDynamicConfig(dynamicconfig.EnableActivityCancellationNexusTask, true)
41+
defer cleanup()
42+
43+
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
44+
defer cancel()
45+
46+
tv := testvars.New(s.T())
47+
poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace().String())
48+
49+
// Get the control queue name from test vars
50+
controlQueueName := tv.ControlQueueName(s.Namespace().String())
51+
s.T().Logf("WorkerInstanceKey: %s", tv.WorkerInstanceKey())
52+
s.T().Logf("ControlQueueName: %s", controlQueueName)
53+
54+
// Start the workflow
55+
startResp, err := s.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
56+
RequestId: tv.Any().String(),
57+
Namespace: s.Namespace().String(),
58+
WorkflowId: tv.WorkflowID(),
59+
WorkflowType: tv.WorkflowType(),
60+
TaskQueue: tv.TaskQueue(),
61+
WorkflowExecutionTimeout: durationpb.New(60 * time.Second),
62+
WorkflowTaskTimeout: durationpb.New(10 * time.Second),
63+
})
64+
s.NoError(err)
65+
s.T().Logf("Started workflow: %s/%s", tv.WorkflowID(), startResp.RunId)
66+
67+
// Poll and complete first workflow task - schedule the activity
68+
_, err = poller.PollAndHandleWorkflowTask(tv,
69+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
70+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
71+
Commands: []*commandpb.Command{
72+
{
73+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
74+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
75+
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
76+
ActivityId: tv.ActivityID(),
77+
ActivityType: tv.ActivityType(),
78+
TaskQueue: tv.TaskQueue(),
79+
ScheduleToCloseTimeout: durationpb.New(60 * time.Second),
80+
StartToCloseTimeout: durationpb.New(60 * time.Second),
81+
},
82+
},
83+
},
84+
},
85+
}, nil
86+
})
87+
s.NoError(err)
88+
s.T().Log("Scheduled activity")
89+
90+
// Poll for activity task and start running the activity.
91+
activityPollResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
92+
Namespace: s.Namespace().String(),
93+
TaskQueue: tv.TaskQueue(),
94+
Identity: tv.WorkerIdentity(),
95+
WorkerInstanceKey: tv.WorkerInstanceKey(),
96+
})
97+
s.NoError(err)
98+
s.NotNil(activityPollResp)
99+
s.NotEmpty(activityPollResp.TaskToken)
100+
s.T().Log("Activity started with WorkerInstanceKey")
101+
102+
// Request workflow cancellation
103+
s.T().Log("Requesting workflow cancellation...")
104+
_, err = s.FrontendClient().RequestCancelWorkflowExecution(ctx, &workflowservice.RequestCancelWorkflowExecutionRequest{
105+
Namespace: s.Namespace().String(),
106+
WorkflowExecution: &commonpb.WorkflowExecution{
107+
WorkflowId: tv.WorkflowID(),
108+
RunId: startResp.RunId,
109+
},
110+
})
111+
s.NoError(err)
112+
113+
// Simulate what the SDK does when a workflow is cancelled.
114+
// - Poll and complete the workflow task with RequestCancelActivityTask command
115+
// - This sets CancelRequested=true and triggers the dispatch of the cancel task to the worker's control queue.
116+
_, err = poller.PollAndHandleWorkflowTask(tv,
117+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
118+
// Find the scheduled event ID
119+
var scheduledEventID int64
120+
for _, event := range task.History.Events {
121+
if event.EventType == enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED {
122+
scheduledEventID = event.EventId
123+
break
124+
}
125+
}
126+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
127+
Commands: []*commandpb.Command{
128+
{
129+
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
130+
Attributes: &commandpb.Command_RequestCancelActivityTaskCommandAttributes{
131+
RequestCancelActivityTaskCommandAttributes: &commandpb.RequestCancelActivityTaskCommandAttributes{
132+
ScheduledEventId: scheduledEventID,
133+
},
134+
},
135+
},
136+
},
137+
}, nil
138+
})
139+
s.NoError(err)
140+
s.T().Log("Workflow task completed with RequestCancelActivityTask command")
141+
142+
// Poll Nexus control queue in a loop until we receive the cancel request
143+
var nexusPollResp *workflowservice.PollNexusTaskQueueResponse
144+
deadline := time.Now().Add(120 * time.Second)
145+
for time.Now().Before(deadline) {
146+
pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second)
147+
nexusPollResp, err = s.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{
148+
Namespace: s.Namespace().String(),
149+
TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
150+
Identity: tv.WorkerIdentity(),
151+
})
152+
pollCancel()
153+
if nexusPollResp != nil && nexusPollResp.Request != nil {
154+
break
155+
}
156+
time.Sleep(100 * time.Millisecond)
157+
}
158+
159+
// Verify we received the cancel request on the control queue
160+
s.Require().NotNil(nexusPollResp, "Timed out waiting for Nexus task")
161+
s.Require().NotNil(nexusPollResp.Request, "Expected to receive Nexus request on control queue")
162+
163+
startOp := nexusPollResp.Request.GetStartOperation()
164+
s.NotNil(startOp, "Expected StartOperation in Nexus request")
165+
s.Equal("cancel-activities", startOp.Operation, "Expected cancel-activities operation")
166+
s.T().Logf("SUCCESS: Received cancel-activities Nexus request on control queue")
167+
}

0 commit comments

Comments
 (0)