Skip to content

Commit e23223e

Browse files
committed
Reject polls from recently-shutdown workers to prevent task theft
When ShutdownWorker cancels a worker's polls via CancelOutstandingWorkerPolls, the SDK's graceful shutdown path may re-poll before fully stopping. This zombie re-poll can sync-match with retry tasks (e.g., activity retries dispatched by the timer queue), which the dying worker silently drops — causing the task to sit until timeout. Add a TTL cache of recently-shutdown WorkerInstanceKeys to the matching engine. Polls arriving from workers in this cache are rejected immediately with an empty response, preventing zombie re-polls from stealing tasks. Made-with: Cursor
1 parent 792f010 commit e23223e

3 files changed

Lines changed: 95 additions & 0 deletions

File tree

service/matching/matching_engine.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.temporal.io/server/client/matching"
3434
"go.temporal.io/server/common"
3535
"go.temporal.io/server/common/backoff"
36+
"go.temporal.io/server/common/cache"
3637
"go.temporal.io/server/common/clock"
3738
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
3839
"go.temporal.io/server/common/cluster"
@@ -69,6 +70,14 @@ const (
6970
// If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable
7071
// This seems aggressive, but the default sticky schedule_to_start timeout is 5s, so 10s seems reasonable.
7172
stickyPollerUnavailableWindow = 10 * time.Second
73+
74+
// shutdownWorkersCacheMaxSize is generous: each entry is a UUID string (~36 bytes),
75+
// entries auto-expire after shutdownWorkersCacheTTL, and the cache only grows when
76+
// workers shut down. Even with aggressive autoscaling, a single matching node is
77+
// unlikely to see more than a few hundred worker shutdowns within the TTL window.
78+
// LRU eviction ensures the oldest entries (least likely to re-poll) are evicted first.
79+
shutdownWorkersCacheMaxSize = 10000
80+
shutdownWorkersCacheTTL = 30 * time.Second
7281
// If a compatible poller hasn't been seen for this time, we fail the CommitBuildId
7382
// Set to 70s so that it's a little over the max time a poller should be kept waiting.
7483
versioningPollerSeenWindow = 70 * time.Second
@@ -166,6 +175,10 @@ type (
166175
outstandingPollers collection.SyncMap[string, context.CancelFunc]
167176
// workerInstancePollers tracks pollers by worker instance key for bulk cancellation during shutdown.
168177
workerInstancePollers workerPollerTracker
178+
// shutdownWorkers is a TTL cache of recently-shutdown worker instance keys.
179+
// Polls from workers in this cache are rejected immediately to prevent
180+
// zombie re-polls from stealing tasks after ShutdownWorker.
181+
shutdownWorkers cache.Cache
169182
// Only set if global namespaces are enabled on the cluster.
170183
namespaceReplicationQueue persistence.NamespaceReplicationQueue
171184
// Lock to serialize replication queue updates.
@@ -294,6 +307,7 @@ func NewEngine(
294307
nexusResults: collection.NewSyncMap[string, chan *nexusResult](),
295308
outstandingPollers: collection.NewSyncMap[string, context.CancelFunc](),
296309
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
310+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
297311
namespaceReplicationQueue: namespaceReplicationQueue,
298312
userDataUpdateBatchers: collection.NewSyncMap[namespace.ID, *stream_batcher.Batcher[*userDataUpdate, error]](),
299313
rateLimiter: rateLimiter,
@@ -1218,6 +1232,9 @@ func (e *matchingEngineImpl) CancelOutstandingWorkerPolls(
12181232
ctx context.Context,
12191233
request *matchingservice.CancelOutstandingWorkerPollsRequest,
12201234
) (*matchingservice.CancelOutstandingWorkerPollsResponse, error) {
1235+
if request.WorkerInstanceKey != "" {
1236+
e.shutdownWorkers.Put(request.WorkerInstanceKey, struct{}{})
1237+
}
12211238
cancelledCount := e.workerInstancePollers.CancelAll(request.WorkerInstanceKey)
12221239
e.removePollerFromHistory(ctx, request)
12231240
return &matchingservice.CancelOutstandingWorkerPollsResponse{CancelledCount: cancelledCount}, nil
@@ -2861,6 +2878,9 @@ func (e *matchingEngineImpl) pollTask(
28612878
workerInstanceKey := pollMetadata.workerInstanceKey
28622879
pollerTrackerKey := uuid.NewString()
28632880
if workerInstanceKey != "" {
2881+
if e.shutdownWorkers.Get(workerInstanceKey) != nil {
2882+
return nil, false, errNoTasks
2883+
}
28642884
e.workerInstancePollers.Add(workerInstanceKey, pollerTrackerKey, cancel)
28652885
}
28662886

service/matching/matching_engine_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
4343
tokenspb "go.temporal.io/server/api/token/v1"
4444
"go.temporal.io/server/common"
45+
"go.temporal.io/server/common/cache"
4546
"go.temporal.io/server/common/clock"
4647
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
4748
"go.temporal.io/server/common/cluster"
@@ -5690,6 +5691,7 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
56905691
t.Parallel()
56915692
engine := &matchingEngineImpl{
56925693
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5694+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
56935695
}
56945696

56955697
resp, err := engine.CancelOutstandingWorkerPolls(context.Background(),
@@ -5705,6 +5707,7 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
57055707
t.Parallel()
57065708
engine := &matchingEngineImpl{
57075709
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5710+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
57085711
}
57095712

57105713
workerKey := "test-worker"
@@ -5731,6 +5734,7 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
57315734
worker2Cancelled := false
57325735
engine := &matchingEngineImpl{
57335736
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5737+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
57345738
}
57355739

57365740
// Set up pollers for worker1 and worker2
@@ -5753,6 +5757,7 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
57535757
t.Parallel()
57545758
engine := &matchingEngineImpl{
57555759
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5760+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
57565761
}
57575762

57585763
workerKey := "test-worker"
@@ -5775,4 +5780,39 @@ func TestCancelOutstandingWorkerPolls(t *testing.T) {
57755780
require.True(t, childCancelled, "child partition poll should be cancelled")
57765781
require.True(t, parentCancelled, "parent partition poll should be cancelled")
57775782
})
5783+
5784+
t.Run("adds worker to shutdown cache", func(t *testing.T) {
5785+
t.Parallel()
5786+
engine := &matchingEngineImpl{
5787+
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5788+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
5789+
}
5790+
5791+
workerKey := "test-worker"
5792+
5793+
_, err := engine.CancelOutstandingWorkerPolls(context.Background(),
5794+
&matchingservice.CancelOutstandingWorkerPollsRequest{
5795+
WorkerInstanceKey: workerKey,
5796+
})
5797+
5798+
require.NoError(t, err)
5799+
require.NotNil(t, engine.shutdownWorkers.Get(workerKey), "worker should be in shutdown cache")
5800+
})
5801+
5802+
t.Run("empty worker key does not populate shutdown cache", func(t *testing.T) {
5803+
t.Parallel()
5804+
engine := &matchingEngineImpl{
5805+
workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)},
5806+
shutdownWorkers: cache.New(shutdownWorkersCacheMaxSize, &cache.Options{TTL: shutdownWorkersCacheTTL}),
5807+
}
5808+
5809+
_, err := engine.CancelOutstandingWorkerPolls(context.Background(),
5810+
&matchingservice.CancelOutstandingWorkerPollsRequest{
5811+
WorkerInstanceKey: "",
5812+
})
5813+
5814+
require.NoError(t, err)
5815+
require.Equal(t, 0, engine.shutdownWorkers.Size())
5816+
})
57785817
}
5818+

tests/task_queue_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,4 +1500,39 @@ func (s *TaskQueueSuite) TestShutdownWorkerCancelsOutstandingPolls() {
15001500
s.NotEqual(tv.WorkerIdentity(), poller.GetIdentity(),
15011501
"poller should be removed from DescribeTaskQueue after shutdown")
15021502
}
1503+
1504+
// Verify that subsequent polls from the same worker are rejected immediately
1505+
// (the shutdown worker cache prevents zombie re-polls from stealing tasks).
1506+
// Use a long timeout so we can distinguish "rejected quickly" from "timed out".
1507+
rePollTimeout := 30 * time.Second
1508+
1509+
// Workflow poll should be rejected immediately.
1510+
wfStart := time.Now()
1511+
rePollCtx, rePollCancel := context.WithTimeout(ctx, rePollTimeout)
1512+
defer rePollCancel()
1513+
rePollResp, err := s.FrontendClient().PollWorkflowTaskQueue(rePollCtx, &workflowservice.PollWorkflowTaskQueueRequest{
1514+
Namespace: s.Namespace().String(),
1515+
TaskQueue: tv.TaskQueue(),
1516+
Identity: tv.WorkerIdentity(),
1517+
WorkerInstanceKey: workerInstanceKey,
1518+
})
1519+
s.NoError(err)
1520+
s.NotNil(rePollResp)
1521+
s.Empty(rePollResp.GetTaskToken(), "re-poll from shutdown worker should return empty response")
1522+
s.Less(time.Since(wfStart), 10*time.Second, "workflow re-poll should be rejected quickly, not wait for timeout")
1523+
1524+
// Activity poll should also be rejected immediately.
1525+
actStart := time.Now()
1526+
actCtx, actCancel := context.WithTimeout(ctx, rePollTimeout)
1527+
defer actCancel()
1528+
actResp, err := s.FrontendClient().PollActivityTaskQueue(actCtx, &workflowservice.PollActivityTaskQueueRequest{
1529+
Namespace: s.Namespace().String(),
1530+
TaskQueue: tv.TaskQueue(),
1531+
Identity: tv.WorkerIdentity(),
1532+
WorkerInstanceKey: workerInstanceKey,
1533+
})
1534+
s.NoError(err)
1535+
s.NotNil(actResp)
1536+
s.Empty(actResp.GetTaskToken(), "activity re-poll from shutdown worker should return empty response")
1537+
s.Less(time.Since(actStart), 10*time.Second, "activity re-poll should be rejected quickly, not wait for timeout")
15031538
}

0 commit comments

Comments
 (0)