File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -891,10 +891,11 @@ def _get_pool_options(self: "Self") -> "Dict[str, Any]":
891891 else []
892892 )
893893
894- used_options = {(o [0 ], o [1 ]) for o in socket_options }
895- for default_option in KEEP_ALIVE_SOCKET_OPTIONS :
896- if (default_option [0 ], default_option [1 ]) not in used_options :
897- socket_options .append (default_option )
894+ if self .options ["keep_alive" ]:
895+ used_options = {(o [0 ], o [1 ]) for o in socket_options }
896+ for default_option in KEEP_ALIVE_SOCKET_OPTIONS :
897+ if (default_option [0 ], default_option [1 ]) not in used_options :
898+ socket_options .append (default_option )
898899
899900 options ["socket_options" ] = socket_options
900901
Original file line number Diff line number Diff line change @@ -219,8 +219,6 @@ def kill(self) -> None:
219219 for task in tasks_to_cancel :
220220 task .cancel ()
221221 self ._active_tasks .clear ()
222- # Reset queue to avoid stale terminators on restart
223- self ._queue = None
224222 self ._loop = None
225223 self ._task = None
226224 self ._task_for_pid = None
@@ -229,8 +227,8 @@ def start(self) -> None:
229227 if not self .is_alive :
230228 try :
231229 self ._loop = asyncio .get_running_loop ()
232- if self . _queue is None :
233- self ._queue = asyncio .Queue (maxsize = self ._queue_size )
230+ # Always create a fresh queue on start to avoid stale items
231+ self ._queue = asyncio .Queue (maxsize = self ._queue_size )
234232 with mark_sentry_task_internal ():
235233 self ._task = self ._loop .create_task (self ._target ())
236234 self ._task_for_pid = os .getpid ()
Original file line number Diff line number Diff line change @@ -1279,20 +1279,20 @@ def test_async_worker_start_no_running_loop():
12791279@pytest .mark .asyncio
12801280@pytest .mark .skipif (not PY38 , reason = "AsyncWorker requires Python 3.8+" )
12811281@pytest .mark .filterwarnings ("ignore::pytest.PytestUnraisableExceptionWarning" )
1282- async def test_async_worker_start_creates_fresh_queue_after_kill ():
1283- """Test start() creates a fresh queue after kill() resets it ."""
1282+ async def test_async_worker_start_creates_fresh_queue_on_restart ():
1283+ """Test start() creates a fresh queue on each restart ."""
12841284 from sentry_sdk .worker import AsyncWorker
12851285
12861286 worker = AsyncWorker (queue_size = 10 )
12871287 worker .start ()
1288- assert worker ._queue is not None
1289- # Kill resets queue to None to avoid stale terminators
1288+ old_queue = worker ._queue
1289+ assert old_queue is not None
12901290 worker .kill ()
12911291 await asyncio .sleep (0 ) # Allow cancelled tasks to be cleaned up
1292- assert worker ._queue is None
1293- # Restart creates a fresh queue
1292+ # Restart creates a fresh queue (avoids stale items)
12941293 worker .start ()
12951294 assert worker ._queue is not None
1295+ assert worker ._queue is not old_queue
12961296 worker .kill ()
12971297 await asyncio .sleep (0 ) # Allow cancelled tasks to be cleaned up
12981298
You can’t perform that action at this time.
0 commit comments