diff --git a/CHANGES/12281.bugfix.rst b/CHANGES/12281.bugfix.rst new file mode 100644 index 00000000000..8edde0ad3de --- /dev/null +++ b/CHANGES/12281.bugfix.rst @@ -0,0 +1,4 @@ +Fixed "Future exception was never retrieved" warning when a request handler +is cancelled during TCP write back-pressure. ``_drain_helper`` now awaits the +drain waiter directly instead of wrapping it in :func:`asyncio.shield` +-- by :user:`joaquinhuigomez`. diff --git a/aiohttp/base_protocol.py b/aiohttp/base_protocol.py index 7f01830f4e9..d7d83425b88 100644 --- a/aiohttp/base_protocol.py +++ b/aiohttp/base_protocol.py @@ -97,4 +97,4 @@ async def _drain_helper(self) -> None: if waiter is None: waiter = self._loop.create_future() self._drain_waiter = waiter - await asyncio.shield(waiter) + await waiter diff --git a/tests/test_base_protocol.py b/tests/test_base_protocol.py index 713dba2d0c2..12d9466dbd0 100644 --- a/tests/test_base_protocol.py +++ b/tests/test_base_protocol.py @@ -242,6 +242,50 @@ async def wait() -> None: assert pr._drain_waiter is None +async def test_cancelled_drain_no_unhandled_future_warning() -> None: + """Cancelling a task during backpressure must not leave an orphaned future. + + When the handler task is cancelled while awaiting _drain_helper and + connection_lost fires with an exception afterward, the waiter should + already be done (cancelled) so set_exception is skipped. No "Future + exception was never retrieved" warning should appear. + + Regression test for https://github.com/aio-libs/aiohttp/issues/12281 + """ + loop = asyncio.get_event_loop() + pr = BaseProtocol(loop=loop) + tr = mock.Mock() + pr.connection_made(tr) + pr.pause_writing() + + fut = loop.create_future() + + async def wait() -> None: + fut.set_result(None) + await pr._drain_helper() + + t = loop.create_task(wait()) + await fut + t.cancel() + with suppress(asyncio.CancelledError): + await t + + # After cancellation the waiter should be done (cancelled), so + # connection_lost with an exception must not call set_exception. + assert pr._drain_waiter is not None + waiter = pr._drain_waiter + assert waiter.done(), "waiter must be cancelled when task is cancelled" + + # This previously left an orphaned future with an unhandled exception + # because asyncio.shield kept the original waiter alive and uncancelled. + exc = RuntimeError("connection died") + pr.connection_lost(exc) + assert pr._drain_waiter is None + + # Verify the waiter is cancelled, not set with an exception. + assert waiter.cancelled() # type: ignore[unreachable] + + async def test_parallel_drain_race_condition() -> None: loop = asyncio.get_event_loop() pr = BaseProtocol(loop=loop)