diff --git a/changelog/69187.fixed.md b/changelog/69187.fixed.md new file mode 100644 index 000000000000..4820965eba60 --- /dev/null +++ b/changelog/69187.fixed.md @@ -0,0 +1 @@ +Fix `AttributeError: 'NoneType' object has no attribute 'set_result'` raised from `salt.transport.tcp._TCPPubServerPublisher._connect` when the publisher's `close()` runs concurrently with an in-flight `_connect()` task. diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 28f9d71c6d29..2d1571a5568d 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -1675,7 +1675,13 @@ async def _connect(self, timeout=None): ) try: await self.stream.connect(sock_addr) - self._connecting_future.set_result(True) + # ``close()`` may have run while we were awaiting + # ``stream.connect()``; it nulls ``_connecting_future``. Issue + # #69187: skip the result-setting in that case rather than + # blowing up with ``'NoneType' object has no attribute + # 'set_result'``. + if self._connecting_future is not None: + self._connecting_future.set_result(True) break except Exception as e: # pylint: disable=broad-except if self.stream.closed(): @@ -1685,7 +1691,10 @@ async def _connect(self, timeout=None): if self.stream is not None: self.stream.close() self.stream = None - self._connecting_future.set_exception(e) + # Same race as above (issue #69187): if ``close()`` ran + # while we were awaiting, ``_connecting_future`` is None. + if self._connecting_future is not None: + self._connecting_future.set_exception(e) break def close(self): diff --git a/tests/pytests/functional/transport/tcp/test_pub_server.py b/tests/pytests/functional/transport/tcp/test_pub_server.py index 5abf821d6e19..99ec69a7ad77 100644 --- a/tests/pytests/functional/transport/tcp/test_pub_server.py +++ b/tests/pytests/functional/transport/tcp/test_pub_server.py @@ -1,12 +1,130 @@ import asyncio +import logging import os +import socket import time import tornado.gen +import tornado.iostream import salt.transport.tcp +async def test_publisher_close_during_connect_no_attribute_error_69187( + io_loop, monkeypatch +): + """ + Regression test for #69187. + + Drives ``_TCPPubServerPublisher`` through its real ``connect()``, + ``_connect()``, and ``close()`` entry points on a real asyncio / + tornado io_loop. The only piece we slow down is ``IOStream.connect`` + — we wrap it so the in-flight ``_connect()`` task is reliably parked + on its ``await`` when ``publisher.close()`` runs, which is the race + described in the issue. + + Without the fix the in-flight ``_connect()`` task raises + ``AttributeError: 'NoneType' object has no attribute 'set_result'`` + (or ``set_exception``). The task is scheduled with + ``io_loop.create_task()``; tornado's ``IOLoop._discard_future_result`` + callback consumes the exception and routes it through + ``IOLoop.handle_callback_exception`` → ``tornado`` logger at ERROR. + This test installs a logging handler on the ``tornado`` logger that + captures records produced during the close-during-connect window and + asserts none reference ``AttributeError``. + """ + # Pause the IOStream connect handshake until the test releases it, so + # _connect() is guaranteed to be awaiting when close() runs. + release = asyncio.Event() + started = asyncio.Event() + real_connect = tornado.iostream.IOStream.connect + + async def slow_connect(self, address, *args, **kwargs): + started.set() + await release.wait() + return await real_connect(self, address, *args, **kwargs) + + monkeypatch.setattr(tornado.iostream.IOStream, "connect", slow_connect) + + # tornado logs exceptions raised inside loop callbacks via the + # ``tornado`` / ``tornado.application`` loggers; capture those records + # for the duration of the test. + captured_records = [] + + class _Capture(logging.Handler): + def emit(self, record): + captured_records.append(record) + + capture_handler = _Capture(level=logging.DEBUG) + tornado_logger = logging.getLogger("tornado") + tornado_logger.addHandler(capture_handler) + prev_level = tornado_logger.level + tornado_logger.setLevel(logging.DEBUG) + + try: + # Bind a real listener so the eventual real connect, when it + # resumes, completes cleanly rather than blocking. + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(("127.0.0.1", 0)) + listener.listen(5) + host, port = listener.getsockname() + try: + publisher = salt.transport.tcp._TCPPubServerPublisher( + host=host, port=port, path=None, io_loop=io_loop + ) + + # publisher.connect() schedules _connect() on the io_loop via + # io_loop.create_task() and returns the connecting future. + connect_future = publisher.connect(timeout=None) + + # Wait until _connect() has reached the slow IOStream.connect + # await — _connecting_future is the live future at this point + # and close() is about to null it. + await asyncio.wait_for(started.wait(), timeout=5) + + # close() nulls _connecting_future while _connect() is parked; + # without the fix the in-flight task crashes on the next line + # of _connect() (set_result on success, set_exception on + # failure). + publisher.close() + + # Let IOStream.connect resume so _connect() unparks and walks + # into the set_result / set_exception branch. + release.set() + + # Drain the loop so the _connect() task either resolves or + # raises into tornado's discard-future-result callback. + try: + await asyncio.wait_for(connect_future, timeout=2) + except (asyncio.TimeoutError, ConnectionRefusedError, OSError): + pass + await asyncio.sleep(0.1) + finally: + listener.close() + finally: + tornado_logger.removeHandler(capture_handler) + tornado_logger.setLevel(prev_level) + + matching = [] + for record in captured_records: + message = record.getMessage() + if record.exc_info: + exc = record.exc_info[1] + chain = [] + while exc is not None: + chain.append(exc) + exc = exc.__context__ or exc.__cause__ + if any(isinstance(e, AttributeError) for e in chain): + matching.append(message) + continue + if "AttributeError" in message: + matching.append(message) + assert ( + not matching + ), f"AttributeError leaked from _connect() after close(): {matching!r}" + + async def test_pub_channel(master_opts, minion_opts, io_loop): def presence_callback(client): pass diff --git a/tests/pytests/unit/transport/test_tcp.py b/tests/pytests/unit/transport/test_tcp.py index 6ef94bcbaf23..742845ec7897 100644 --- a/tests/pytests/unit/transport/test_tcp.py +++ b/tests/pytests/unit/transport/test_tcp.py @@ -205,6 +205,121 @@ def fake_socket(family, *args, **kwargs): assert captured_family == [socket.AF_INET6] +async def test_tcppubserverpublisher_close_during_connect_no_attribute_error_69187( + io_loop, +): + """ + Regression test for #69187. + + ``_TCPPubServerPublisher.close()`` nulls ``self._connecting_future`` while + a concurrent ``_connect()`` coroutine is awaiting ``stream.connect()``. + When the await resumes (succeeds or raises), ``_connect()`` calls + ``self._connecting_future.set_result(True)`` or + ``self._connecting_future.set_exception(e)`` on ``None`` and crashes with + ``AttributeError: 'NoneType' object has no attribute 'set_result'`` (or + ``set_exception``). The original future is then orphaned and tornado + logs the misleading ``Future <...> exception was never retrieved`` + message described in the issue. + + This test drives the close-during-connect race both ways: + + 1. ``stream.connect()`` raises (the path that originally caused + ``set_exception`` to be called on ``None``). + 2. ``stream.connect()`` succeeds (the ``set_result`` path). + """ + + # ----- 1. close-during-failed-connect (set_exception path) ----- + publisher = salt.transport.tcp._TCPPubServerPublisher( + host="127.0.0.1", port=4511, path=None, io_loop=io_loop + ) + publisher._connecting_future = tornado.concurrent.Future() + connect_started = asyncio.Event() + let_connect_finish = asyncio.Event() + + class _FakeStream: + def __init__(self, *args, **kwargs): + self._closed = False + + async def connect(self, addr): + connect_started.set() + await let_connect_finish.wait() + raise tornado.iostream.StreamClosedError("Stream is closed") + + def closed(self): + return self._closed + + def close(self): + self._closed = True + + with patch("salt.transport.tcp.socket.socket", lambda *a, **kw: MagicMock()): + with patch("salt.transport.tcp.tornado.iostream.IOStream", _FakeStream): + # timeout=None means the retry-loop's "should I keep retrying?" + # check (``timeout is None or time.monotonic() > timeout_at``) + # always selects the "give up, set_exception" branch — which is + # the exact branch that crashes in the issue's stack trace + # (legacy ipc.py line 343). + connect_task = asyncio.ensure_future(publisher._connect(timeout=None)) + try: + await connect_started.wait() + # close() nulls _connecting_future while _connect is awaiting + publisher.close() + # Now release the awaited stream.connect() so _connect resumes + # and walks into the buggy ``set_exception`` line. + let_connect_finish.set() + # If the bug is present, the connect_task fails with + # AttributeError ("'NoneType' object has no attribute + # 'set_exception'"). If the bug is fixed, the task completes + # cleanly. + await asyncio.wait_for(connect_task, timeout=5) + finally: + if not connect_task.done(): + connect_task.cancel() + try: + await connect_task + except asyncio.CancelledError: + pass + + # ----- 2. close-during-successful-connect (set_result path) ----- + publisher2 = salt.transport.tcp._TCPPubServerPublisher( + host="127.0.0.1", port=4511, path=None, io_loop=io_loop + ) + publisher2._connecting_future = tornado.concurrent.Future() + connect_started2 = asyncio.Event() + let_connect_finish2 = asyncio.Event() + + class _FakeStreamOk: + def __init__(self, *args, **kwargs): + self._closed = False + + async def connect(self, addr): + connect_started2.set() + await let_connect_finish2.wait() + # successful connect — _connect will fall through to set_result + return None + + def closed(self): + return self._closed + + def close(self): + self._closed = True + + with patch("salt.transport.tcp.socket.socket", lambda *a, **kw: MagicMock()): + with patch("salt.transport.tcp.tornado.iostream.IOStream", _FakeStreamOk): + connect_task2 = asyncio.ensure_future(publisher2._connect(timeout=5)) + try: + await connect_started2.wait() + publisher2.close() + let_connect_finish2.set() + await asyncio.wait_for(connect_task2, timeout=5) + finally: + if not connect_task2.done(): + connect_task2.cancel() + try: + await connect_task2 + except asyncio.CancelledError: + pass + + @pytest.mark.usefixtures("_squash_exepected_message_client_warning") async def test_message_client_cleanup_on_close(client_socket, temp_salt_master): """