Skip to content

Commit 94b6c73

Browse files
BYKclaude
andcommitted
fix: Capture queue ref at dispatch time in _on_task_complete
Bind the queue reference when the task is dispatched, not when the done callback fires. This prevents kill()/start() from replacing self._queue before old callbacks can call task_done(), which would corrupt the new queue's unfinished_tasks counter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d64517f commit 94b6c73

2 files changed

Lines changed: 19 additions & 11 deletions

File tree

sentry_sdk/worker.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,10 @@ async def _target(self) -> None:
305305
# Create a strong reference to the task so it can be cancelled on kill
306306
# and does not get garbage collected while running
307307
self._active_tasks.add(task)
308-
task.add_done_callback(self._on_task_complete)
308+
# Capture queue ref at dispatch time so done callbacks use the
309+
# correct queue even if kill()/start() replace self._queue.
310+
queue_ref = self._queue
311+
task.add_done_callback(lambda t: self._on_task_complete(t, queue_ref))
309312
# Yield to let the event loop run other tasks
310313
await asyncio.sleep(0)
311314
except asyncio.CancelledError:
@@ -315,7 +318,11 @@ async def _process_callback(self, callback: "Callable[[], Any]") -> None:
315318
# Callback is an async coroutine, need to await it
316319
await callback()
317320

318-
def _on_task_complete(self, task: "asyncio.Task[None]") -> None:
321+
def _on_task_complete(
322+
self,
323+
task: "asyncio.Task[None]",
324+
queue: "Optional[asyncio.Queue[Any]]" = None,
325+
) -> None:
319326
try:
320327
task.result()
321328
except asyncio.CancelledError:
@@ -324,7 +331,8 @@ def _on_task_complete(self, task: "asyncio.Task[None]") -> None:
324331
logger.error("Failed processing job", exc_info=True)
325332
finally:
326333
# Mark the task as done and remove it from the active tasks set
327-
# This happens only after the task has completed
328-
if self._queue is not None:
329-
self._queue.task_done()
334+
# Use the queue reference captured at dispatch time, not self._queue,
335+
# to avoid calling task_done() on a different queue after kill()/start().
336+
if queue is not None:
337+
queue.task_done()
330338
self._active_tasks.discard(task)

tests/test_transport.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,7 +1656,7 @@ async def will_be_cancelled():
16561656

16571657
# _on_task_complete should handle CancelledError without logging error
16581658
with mock.patch("sentry_sdk.worker.logger") as mock_logger:
1659-
worker._on_task_complete(task)
1659+
worker._on_task_complete(task, worker._queue)
16601660
mock_logger.error.assert_not_called()
16611661

16621662
# Verify task was discarded from active_tasks
@@ -1716,7 +1716,7 @@ async def simple_cb():
17161716
mock_task.result.return_value = None
17171717
worker._active_tasks.add(mock_task)
17181718

1719-
worker._on_task_complete(mock_task)
1719+
worker._on_task_complete(mock_task, worker._queue)
17201720
assert mock_task not in worker._active_tasks
17211721
worker.kill()
17221722
await asyncio.sleep(0) # Allow cancelled tasks to be cleaned up
@@ -2864,7 +2864,7 @@ def test_sync_cov_async_worker_on_task_complete_success():
28642864
mock_task.result.return_value = None
28652865
worker._active_tasks.add(mock_task)
28662866

2867-
worker._on_task_complete(mock_task)
2867+
worker._on_task_complete(mock_task, worker._queue)
28682868

28692869
mock_task.result.assert_called_once()
28702870
mock_queue.task_done.assert_called_once()
@@ -2885,7 +2885,7 @@ def test_sync_cov_async_worker_on_task_complete_cancelled():
28852885
worker._active_tasks.add(mock_task)
28862886

28872887
with mock.patch("sentry_sdk.worker.logger") as mock_logger:
2888-
worker._on_task_complete(mock_task)
2888+
worker._on_task_complete(mock_task, worker._queue)
28892889
# CancelledError should NOT trigger error logging
28902890
mock_logger.error.assert_not_called()
28912891

@@ -2907,7 +2907,7 @@ def test_sync_cov_async_worker_on_task_complete_exception():
29072907
worker._active_tasks.add(mock_task)
29082908

29092909
with mock.patch("sentry_sdk.worker.logger") as mock_logger:
2910-
worker._on_task_complete(mock_task)
2910+
worker._on_task_complete(mock_task, worker._queue)
29112911
mock_logger.error.assert_called_once_with(
29122912
"Failed processing job", exc_info=True
29132913
)
@@ -2929,7 +2929,7 @@ def test_sync_cov_async_worker_on_task_complete_queue_none():
29292929
worker._active_tasks.add(mock_task)
29302930

29312931
# Should not raise despite queue being None
2932-
worker._on_task_complete(mock_task)
2932+
worker._on_task_complete(mock_task, worker._queue)
29332933
assert mock_task not in worker._active_tasks
29342934

29352935

0 commit comments

Comments
 (0)