feat: isolate async handler execution on dedicated worker event loop#273
feat: isolate async handler execution on dedicated worker event loop#273siwachabhi wants to merge 1 commit intomainfrom
Conversation
Async handlers that contain blocking calls (e.g. time.sleep, synchronous HTTP requests) previously ran on the main uvicorn event loop, freezing /ping health checks and causing container termination. This introduces a dedicated persistent worker event loop in a background thread that isolates all handler execution from the main loop. Three-way handler dispatch: - Async generators: bridged to sync generators via queue.Queue on worker loop - Regular async: run_coroutine_threadsafe + wrap_future on worker loop - Sync: run_in_threadpool (starlette thread pool) Context propagation uses contextvars.copy_context() with the Django asgiref _restore_context pattern for Python 3.10+ compatibility.
| """Check if obj is async-callable, unwrapping functools.partial.""" | ||
| while isinstance(obj, functools.partial): | ||
| obj = obj.func | ||
| return asyncio.iscoroutinefunction(obj) or (callable(obj) and asyncio.iscoroutinefunction(obj.__call__)) |
There was a problem hiding this comment.
nit: This only unwraps functools.partial but doesn't follow __wrapped__ chains set by functools.wraps. A decorated async function would be silently misclassified as sync and dispatched to the thread pool instead of the worker loop:
def my_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@my_decorator
async def handler(payload):
return {"ok": True}
# _is_async_callable(handler) → False (checks wrapper, a regular def)Unverified suggestion — use inspect.unwrap() to follow __wrapped__ chains (has a built-in 200-hop safety limit to prevent cycles):
def _unwrap(obj: Any) -> Any:
"""Unwrap functools.partial and decorator chains."""
while isinstance(obj, functools.partial):
obj = obj.func
try:
obj = inspect.unwrap(obj)
except StopIteration:
pass # cycle detected, use as-is
return obj
def _is_async_callable(obj: Any) -> bool:
obj = _unwrap(obj)
return asyncio.iscoroutinefunction(obj) or (
callable(obj) and asyncio.iscoroutinefunction(obj.__call__)
)
def _is_async_gen_callable(obj: Any) -> bool:
obj = _unwrap(obj)
return inspect.isasyncgenfunction(obj) or (
callable(obj) and inspect.isasyncgenfunction(obj.__call__)
)| uvicorn.run(self, **uvicorn_params) | ||
|
|
||
| async def _invoke_handler(self, handler, request_context, takes_context, payload): | ||
| def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop: |
There was a problem hiding this comment.
The sync generator returned by this method is iterated by Starlette via iterate_in_threadpool, which holds a thread for the entire duration of the stream. The thread spends most of its time blocked on q.get() waiting for the next chunk from the worker loop.
Under concurrent streaming workloads (e.g., LLM streaming responses each taking ~20 seconds), this can exhaust the default thread pool (~40 threads) ref. Once exhausted, all new requests are queued waiting for a thread to free up, even though the occupied threads are mostly idle waiting on q.get().
There was a problem hiding this comment.
this one is expected though, customers at max have 2 concurrent connections to runtime servers
Summary
time.sleep, synchronous HTTP) previously froze/pinghealth checks, causing container termination — this fix ensures/pingalways responds promptlyqueue.Queue, regular async viarun_coroutine_threadsafe, sync viarun_in_threadpoolcontextvarsacross event loop boundaries usingcopy_context()+ Django asgiref_restore_contextpattern (Python 3.10+ compatible)Test plan
TestWorkerLoopInvocationcovering:/pingcreate_tasksurvives handler returnfunctools.partialand callable class dispatchHEALTHY_BUSYping status during background taskstest_async_handler_runs_on_worker_loopto assert worker thread isolation