@@ -53,11 +53,8 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityTask(
5353 return nil
5454 }
5555
56- controlQueueName , err := getNexusTaskQueue (mutableState , task .ScheduledEventIDs )
57- if err != nil {
58- return err
59- }
60- if controlQueueName == "" {
56+ // If control queue is not set, it means the worker that this activity belongs to does not support Nexus tasks.
57+ if task .WorkerControlTaskQueue == "" {
6158 return nil
6259 }
6360
@@ -69,30 +66,7 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityTask(
6966 return nil
7067 }
7168
72- return t .dispatchCancelTaskToWorker (ctx , task .NamespaceID , controlQueueName , taskTokens )
73- }
74-
75- // getNexusTaskQueue returns the Nexus control queue for a batch of activities.
76- // All activities in a batch share the same control queue. Returns error if control queues are inconsistent.
77- func getNexusTaskQueue (
78- mutableState historyi.MutableState ,
79- scheduledEventIDs []int64 ,
80- ) (string , error ) {
81- var controlQueueName string
82- for _ , scheduledEventID := range scheduledEventIDs {
83- ai , ok := mutableState .GetActivityInfo (scheduledEventID )
84- if ! ok {
85- continue
86- }
87- if controlQueueName == "" {
88- controlQueueName = ai .WorkerControlTaskQueue
89- } else if controlQueueName != ai .WorkerControlTaskQueue {
90- return "" , serviceerror .NewInternal (fmt .Sprintf (
91- "activities in batch have inconsistent control queues: %q vs %q" ,
92- controlQueueName , ai .WorkerControlTaskQueue ))
93- }
94- }
95- return controlQueueName , nil
69+ return t .dispatchCancelTaskToWorker (ctx , task .NamespaceID , task .WorkerControlTaskQueue , taskTokens )
9670}
9771
9872// buildActivityTaskTokens builds task tokens for activities that need cancellation.
0 commit comments