Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/69187.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `AttributeError: 'NoneType' object has no attribute 'set_result'` raised from `salt.transport.tcp._TCPPubServerPublisher._connect` when the publisher's `close()` runs concurrently with an in-flight `_connect()` task.
13 changes: 11 additions & 2 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,13 @@ async def _connect(self, timeout=None):
)
try:
await self.stream.connect(sock_addr)
self._connecting_future.set_result(True)
# ``close()`` may have run while we were awaiting
# ``stream.connect()``; it nulls ``_connecting_future``. Issue
# #69187: skip the result-setting in that case rather than
# blowing up with ``'NoneType' object has no attribute
# 'set_result'``.
if self._connecting_future is not None:
self._connecting_future.set_result(True)
break
except Exception as e: # pylint: disable=broad-except
if self.stream.closed():
Expand All @@ -1685,7 +1691,10 @@ async def _connect(self, timeout=None):
if self.stream is not None:
self.stream.close()
self.stream = None
self._connecting_future.set_exception(e)
# Same race as above (issue #69187): if ``close()`` ran
# while we were awaiting, ``_connecting_future`` is None.
if self._connecting_future is not None:
self._connecting_future.set_exception(e)
break

def close(self):
Expand Down
118 changes: 118 additions & 0 deletions tests/pytests/functional/transport/tcp/test_pub_server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,130 @@
import asyncio
import logging
import os
import socket
import time

import tornado.gen
import tornado.iostream

import salt.transport.tcp


async def test_publisher_close_during_connect_no_attribute_error_69187(
io_loop, monkeypatch
):
"""
Regression test for #69187.

Drives ``_TCPPubServerPublisher`` through its real ``connect()``,
``_connect()``, and ``close()`` entry points on a real asyncio /
tornado io_loop. The only piece we slow down is ``IOStream.connect``
— we wrap it so the in-flight ``_connect()`` task is reliably parked
on its ``await`` when ``publisher.close()`` runs, which is the race
described in the issue.

Without the fix the in-flight ``_connect()`` task raises
``AttributeError: 'NoneType' object has no attribute 'set_result'``
(or ``set_exception``). The task is scheduled with
``io_loop.create_task()``; tornado's ``IOLoop._discard_future_result``
callback consumes the exception and routes it through
``IOLoop.handle_callback_exception`` → ``tornado`` logger at ERROR.
This test installs a logging handler on the ``tornado`` logger that
captures records produced during the close-during-connect window and
asserts none reference ``AttributeError``.
"""
# Pause the IOStream connect handshake until the test releases it, so
# _connect() is guaranteed to be awaiting when close() runs.
release = asyncio.Event()
started = asyncio.Event()
real_connect = tornado.iostream.IOStream.connect

async def slow_connect(self, address, *args, **kwargs):
started.set()
await release.wait()
return await real_connect(self, address, *args, **kwargs)

monkeypatch.setattr(tornado.iostream.IOStream, "connect", slow_connect)

# tornado logs exceptions raised inside loop callbacks via the
# ``tornado`` / ``tornado.application`` loggers; capture those records
# for the duration of the test.
captured_records = []

class _Capture(logging.Handler):
def emit(self, record):
captured_records.append(record)

capture_handler = _Capture(level=logging.DEBUG)
tornado_logger = logging.getLogger("tornado")
tornado_logger.addHandler(capture_handler)
prev_level = tornado_logger.level
tornado_logger.setLevel(logging.DEBUG)

try:
# Bind a real listener so the eventual real connect, when it
# resumes, completes cleanly rather than blocking.
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(("127.0.0.1", 0))
listener.listen(5)
host, port = listener.getsockname()
try:
publisher = salt.transport.tcp._TCPPubServerPublisher(
host=host, port=port, path=None, io_loop=io_loop
)

# publisher.connect() schedules _connect() on the io_loop via
# io_loop.create_task() and returns the connecting future.
connect_future = publisher.connect(timeout=None)

# Wait until _connect() has reached the slow IOStream.connect
# await — _connecting_future is the live future at this point
# and close() is about to null it.
await asyncio.wait_for(started.wait(), timeout=5)

# close() nulls _connecting_future while _connect() is parked;
# without the fix the in-flight task crashes on the next line
# of _connect() (set_result on success, set_exception on
# failure).
publisher.close()

# Let IOStream.connect resume so _connect() unparks and walks
# into the set_result / set_exception branch.
release.set()

# Drain the loop so the _connect() task either resolves or
# raises into tornado's discard-future-result callback.
try:
await asyncio.wait_for(connect_future, timeout=2)
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
pass
await asyncio.sleep(0.1)
finally:
listener.close()
finally:
tornado_logger.removeHandler(capture_handler)
tornado_logger.setLevel(prev_level)

matching = []
for record in captured_records:
message = record.getMessage()
if record.exc_info:
exc = record.exc_info[1]
chain = []
while exc is not None:
chain.append(exc)
exc = exc.__context__ or exc.__cause__
if any(isinstance(e, AttributeError) for e in chain):
matching.append(message)
continue
if "AttributeError" in message:
matching.append(message)
assert (
not matching
), f"AttributeError leaked from _connect() after close(): {matching!r}"


async def test_pub_channel(master_opts, minion_opts, io_loop):
def presence_callback(client):
pass
Expand Down
115 changes: 115 additions & 0 deletions tests/pytests/unit/transport/test_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,121 @@ def fake_socket(family, *args, **kwargs):
assert captured_family == [socket.AF_INET6]


async def test_tcppubserverpublisher_close_during_connect_no_attribute_error_69187(
io_loop,
):
"""
Regression test for #69187.

``_TCPPubServerPublisher.close()`` nulls ``self._connecting_future`` while
a concurrent ``_connect()`` coroutine is awaiting ``stream.connect()``.
When the await resumes (succeeds or raises), ``_connect()`` calls
``self._connecting_future.set_result(True)`` or
``self._connecting_future.set_exception(e)`` on ``None`` and crashes with
``AttributeError: 'NoneType' object has no attribute 'set_result'`` (or
``set_exception``). The original future is then orphaned and tornado
logs the misleading ``Future <...> exception was never retrieved``
message described in the issue.

This test drives the close-during-connect race both ways:

1. ``stream.connect()`` raises (the path that originally caused
``set_exception`` to be called on ``None``).
2. ``stream.connect()`` succeeds (the ``set_result`` path).
"""

# ----- 1. close-during-failed-connect (set_exception path) -----
publisher = salt.transport.tcp._TCPPubServerPublisher(
host="127.0.0.1", port=4511, path=None, io_loop=io_loop
)
publisher._connecting_future = tornado.concurrent.Future()
connect_started = asyncio.Event()
let_connect_finish = asyncio.Event()

class _FakeStream:
def __init__(self, *args, **kwargs):
self._closed = False

async def connect(self, addr):
connect_started.set()
await let_connect_finish.wait()
raise tornado.iostream.StreamClosedError("Stream is closed")

def closed(self):
return self._closed

def close(self):
self._closed = True

with patch("salt.transport.tcp.socket.socket", lambda *a, **kw: MagicMock()):
with patch("salt.transport.tcp.tornado.iostream.IOStream", _FakeStream):
# timeout=None means the retry-loop's "should I keep retrying?"
# check (``timeout is None or time.monotonic() > timeout_at``)
# always selects the "give up, set_exception" branch — which is
# the exact branch that crashes in the issue's stack trace
# (legacy ipc.py line 343).
connect_task = asyncio.ensure_future(publisher._connect(timeout=None))
try:
await connect_started.wait()
# close() nulls _connecting_future while _connect is awaiting
publisher.close()
# Now release the awaited stream.connect() so _connect resumes
# and walks into the buggy ``set_exception`` line.
let_connect_finish.set()
# If the bug is present, the connect_task fails with
# AttributeError ("'NoneType' object has no attribute
# 'set_exception'"). If the bug is fixed, the task completes
# cleanly.
await asyncio.wait_for(connect_task, timeout=5)
finally:
if not connect_task.done():
connect_task.cancel()
try:
await connect_task
except asyncio.CancelledError:
pass

# ----- 2. close-during-successful-connect (set_result path) -----
publisher2 = salt.transport.tcp._TCPPubServerPublisher(
host="127.0.0.1", port=4511, path=None, io_loop=io_loop
)
publisher2._connecting_future = tornado.concurrent.Future()
connect_started2 = asyncio.Event()
let_connect_finish2 = asyncio.Event()

class _FakeStreamOk:
def __init__(self, *args, **kwargs):
self._closed = False

async def connect(self, addr):
connect_started2.set()
await let_connect_finish2.wait()
# successful connect — _connect will fall through to set_result
return None

def closed(self):
return self._closed

def close(self):
self._closed = True

with patch("salt.transport.tcp.socket.socket", lambda *a, **kw: MagicMock()):
with patch("salt.transport.tcp.tornado.iostream.IOStream", _FakeStreamOk):
connect_task2 = asyncio.ensure_future(publisher2._connect(timeout=5))
try:
await connect_started2.wait()
publisher2.close()
let_connect_finish2.set()
await asyncio.wait_for(connect_task2, timeout=5)
finally:
if not connect_task2.done():
connect_task2.cancel()
try:
await connect_task2
except asyncio.CancelledError:
pass


@pytest.mark.usefixtures("_squash_exepected_message_client_warning")
async def test_message_client_cleanup_on_close(client_socket, temp_salt_master):
"""
Expand Down
Loading