Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions api/matchingservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ message PollWorkflowTaskQueueResponse {
// Raw history bytes sent from matching service when history.sendRawHistoryBetweenInternalServices is enabled.
// Matching client will deserialize this to History when it receives the response.
temporal.api.history.v1.History raw_history = 22;
// When true, this empty response was caused by the server completing the poll
// because the worker has been shut down via the ShutdownWorker API.
bool completed_by_worker_shutdown = 23;
}

// PollWorkflowTaskQueueResponseWithRawHistory is wire-compatible with PollWorkflowTaskQueueResponse.
Expand Down Expand Up @@ -112,6 +115,9 @@ message PollWorkflowTaskQueueResponseWithRawHistory {
// When matching client deserializes this to PollWorkflowTaskQueueResponse, this field
// will be automatically deserialized to the raw_history field as History.
repeated bytes raw_history = 22;
// When true, this empty response was caused by the server completing the poll
// because the worker has been shut down via the ShutdownWorker API.
bool completed_by_worker_shutdown = 23;
}

message PollActivityTaskQueueRequest {
Expand Down Expand Up @@ -149,6 +155,9 @@ message PollActivityTaskQueueResponse {
temporal.api.common.v1.RetryPolicy retry_policy = 19;
// ID of the activity run (applicable for standalone activities only)
string activity_run_id = 20;
// When true, this empty response was caused by the server completing the poll
// because the worker has been shut down via the ShutdownWorker API.
bool completed_by_worker_shutdown = 21;
}

message AddWorkflowTaskRequest {
Expand Down Expand Up @@ -579,6 +588,9 @@ message PollNexusTaskQueueRequest {
message PollNexusTaskQueueResponse {
// Response that should be delivered to the worker containing a request from DispatchNexusTaskRequest.
temporal.api.workflowservice.v1.PollNexusTaskQueueResponse response = 1;
// When true, this empty response was caused by the server completing the poll
// because the worker has been shut down via the ShutdownWorker API.
bool completed_by_worker_shutdown = 2;
}

message RespondNexusTaskCompletedRequest {
Expand Down
46 changes: 37 additions & 9 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ type (
workerInstanceKey string
}

pollResult struct {
task *internalTask
versionSetUsed bool
completedByShutdown bool
}

userDataUpdate struct {
taskQueue string
update persistence.SingleTaskQueueUserDataUpdate
Expand Down Expand Up @@ -687,13 +693,19 @@ pollLoop:
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
result, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
if errors.Is(err, errNoTasks) {
if result.completedByShutdown {
return &matchingservice.PollWorkflowTaskQueueResponseWithRawHistory{
CompletedByWorkerShutdown: true,
}, nil
}
return emptyPollWorkflowTaskQueueResponse, nil
}
return nil, err
}
task := result.task
if task.isStarted() {
// tasks received from remote are already started. So, simply forward the response
// no need to emit task dispatch latency metric because the parent partition already did it.
Expand Down Expand Up @@ -750,7 +762,7 @@ pollLoop:
}

requestClone := request
if versionSetUsed {
if result.versionSetUsed {
// We remove build ID from workerVersionCapabilities so History can differentiate between
// old and new versioning in Record*TaskStart.
// TODO: remove this block after old versioning cleanup. [cleanup-old-wv]
Expand Down Expand Up @@ -959,20 +971,26 @@ pollLoop:
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
result, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
if errors.Is(err, errNoTasks) {
if result.completedByShutdown {
return &matchingservice.PollActivityTaskQueueResponse{
CompletedByWorkerShutdown: true,
}, nil
}
return emptyPollActivityTaskQueueResponse, nil
}
return nil, err
}

task := result.task
if task.isStarted() {
// tasks received from remote are already started. So, simply forward the response
return task.pollActivityTaskQueueResponse(), nil
}
requestClone := request
if versionSetUsed {
if result.versionSetUsed {
// We remove build ID from workerVersionCapabilities so History can differentiate between
// old and new versioning in Record*TaskStart.
// TODO: remove this block after old versioning cleanup. [cleanup-old-wv]
Expand Down Expand Up @@ -2550,14 +2568,20 @@ pollLoop:
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
}
task, _, err := e.pollTask(pollerCtx, partition, pollMetadata)
result, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
if errors.Is(err, errNoTasks) {
if result.completedByShutdown {
return &matchingservice.PollNexusTaskQueueResponse{
CompletedByWorkerShutdown: true,
}, nil
}
return &matchingservice.PollNexusTaskQueueResponse{}, nil
}
return nil, err
}

task := result.task
if task.isStarted() {
// tasks received from remote are already started. So, simply forward the response
return task.pollNexusTaskQueueResponse(), nil
Expand Down Expand Up @@ -2807,10 +2831,10 @@ func (e *matchingEngineImpl) pollTask(
ctx context.Context,
partition tqid.Partition,
pollMetadata *pollMetadata,
) (*internalTask, bool, error) {
) (pollResult, error) {
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, true, loadCausePoll)
if err != nil {
return nil, false, err
return pollResult{}, err
}

pollMetadata.localPollStartTime = e.timeSource.Now()
Expand All @@ -2827,7 +2851,7 @@ func (e *matchingEngineImpl) pollTask(
tag.WorkflowTaskQueueType(partition.TaskType()),
tag.NewStringTag("worker-instance-key", workerInstanceKey),
)
return nil, false, errNoTasks
return pollResult{completedByShutdown: true}, errNoTasks
}

ctx, cancel := contextutil.WithDeadlineBuffer(ctx, pm.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
Expand All @@ -2840,6 +2864,9 @@ func (e *matchingEngineImpl) pollTask(
// Use UUID (not pollerID) because pollerID is reused when forwarded.
pollerTrackerKey := uuid.NewString()
if workerInstanceKey != "" {
if e.shutdownWorkers.Get(workerInstanceKey) != nil {
return pollResult{completedByShutdown: true}, errNoTasks
}
e.workerInstancePollers.Add(workerInstanceKey, pollerTrackerKey, cancel)
}

Expand All @@ -2850,7 +2877,8 @@ func (e *matchingEngineImpl) pollTask(
}
}()
}
return pm.PollTask(ctx, pollMetadata)
task, versionSetUsed, err := pm.PollTask(ctx, pollMetadata)
return pollResult{task: task, versionSetUsed: versionSetUsed}, err
}

// emitTaskDispatchLatency emits latency metrics for a task dispatched to a worker.
Expand Down
Loading
Loading