diff --git a/changelog/65709.fixed.md b/changelog/65709.fixed.md new file mode 100644 index 000000000000..7d072e727874 --- /dev/null +++ b/changelog/65709.fixed.md @@ -0,0 +1,8 @@ +Fix deadlock in parallel `cmd.script` states when the script is served by the master. + +Same fork-inherited ZeroMQ socket race as the `file.managed` fix: a +`cmd.script` state with `parallel: True` downloads the script via +`cp.cache_file` in a forked child that inherited the parent's ZeroMQ +REQ socket, deadlocking the asyncio loop at ~100% CPU. Resolved by the +same `os.register_at_fork` handlers that drop inherited channel/socket +references in forked children. diff --git a/changelog/68940.fixed.md b/changelog/68940.fixed.md new file mode 100644 index 000000000000..e09d8685b968 --- /dev/null +++ b/changelog/68940.fixed.md @@ -0,0 +1,9 @@ +Fix deadlock in parallel `file.managed` states when source is served by the master. + +Forked parallel-state children previously inherited the parent's ZeroMQ +REQ socket and asyncio loop from `salt.fileclient.RemoteClient`, +`salt.crypt.AsyncAuth/SAuth`, and `salt.utils.event.SaltEvent`. Multiple +sibling children racing those handles deadlocked the asyncio loop with +~98% CPU and never completed. Salt now registers `os.register_at_fork` +handlers on those classes that drop inherited channel/socket references +in any forked child; the next use rebuilds them fresh. diff --git a/salt/crypt.py b/salt/crypt.py index b30ee34849c0..975d73609f91 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -609,10 +609,35 @@ class AsyncAuth: # mapping of key -> creds creds_map = {} + _atfork_registered = False + + @classmethod + def _register_atfork(cls): + # AsyncAuth singletons are bound to a specific tornado IOLoop + # whose state cannot be safely shared across a fork(). Drop the + # singleton map in the child so a fresh AsyncAuth (with a fresh + # io_loop) is created on next use. creds_map is intentionally + # preserved -- AES creds remain valid in the child and reusing + # them avoids a re-auth roundtrip on the first master RPC after + # fork. + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + try: + cls.instance_map = weakref.WeakKeyDictionary() + except Exception: # pylint: disable=broad-except + # Never let an at-fork handler raise. + pass + def __new__(cls, opts, io_loop=None): """ Only create one instance of AsyncAuth per __key() """ + cls._register_atfork() # do we have any mapping for this io_loop io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() if io_loop not in AsyncAuth.instance_map: @@ -1407,10 +1432,22 @@ class SAuth(AsyncAuth): # This class is only a singleton per minion/master pair instances = weakref.WeakValueDictionary() + # SAuth tracks atfork registration independently from AsyncAuth + # because it has its own singleton map that needs clearing. + _atfork_registered = False + + @classmethod + def _after_fork_in_child(cls): + try: + cls.instances = weakref.WeakValueDictionary() + except Exception: # pylint: disable=broad-except + pass + def __new__(cls, opts, io_loop=None): """ Only create one instance of SAuth per __key() """ + cls._register_atfork() key = cls.__key(opts) auth = SAuth.instances.get(key) if auth is None: diff --git a/salt/fileclient.py b/salt/fileclient.py index d3694a316570..9adb23b5742b 100644 --- a/salt/fileclient.py +++ b/salt/fileclient.py @@ -13,6 +13,7 @@ import time import urllib.error import urllib.parse +import weakref import salt.channel.client import salt.client @@ -1126,23 +1127,86 @@ class RemoteClient(Client): Interact with the salt master file server. """ + # Live RemoteClient instances tracked weakly so the at-fork handler can + # drop ZMQ sockets / IOLoop state inherited by any forked child. Using + # a parent's channel from multiple sibling children races the ZMQ + # REQ/REP state machine and deadlocks the asyncio loop. + _instances = weakref.WeakSet() + _atfork_registered = False + def __init__(self, opts): Client.__init__(self, opts) self._closing = False - self.channel = salt.channel.client.ReqChannel.factory(self.opts) - if hasattr(self.channel, "auth"): - self.auth = self.channel.auth - else: - self.auth = "" + # Eager init preserves prior __init__ semantics (existing tests + # expect ReqChannel.factory to be called once at construction time). + # After fork the at-fork handler clears _channel/_auth and the + # property below rebuilds them lazily on next use. + self._channel = salt.channel.client.ReqChannel.factory(self.opts) + self._auth = getattr(self._channel, "auth", "") + type(self)._register_atfork() + type(self)._instances.add(self) + + @classmethod + def _register_atfork(cls): + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + # Drop references to inherited ZMQ sockets and asyncio/tornado + # loops -- they are unsafe to use in a forked child (per ZeroMQ + # guide). We deliberately do NOT call .close() here: SyncWrapper + # close() tears down the IOLoop's FDs which were copied from the + # parent process state and may corrupt unrelated handlers. GC + # will reclaim child-side FD copies; the parent keeps its own. + for inst in list(cls._instances): + try: + inst._channel = None + inst._auth = "" + except Exception: # pylint: disable=broad-except + # Never let an at-fork handler raise -- the child would + # die before any user code could log the failure. + pass + + @property + def channel(self): + channel = getattr(self, "_channel", None) + if channel is None: + channel = salt.channel.client.ReqChannel.factory(self.opts) + self._channel = channel + self._auth = getattr(channel, "auth", "") + return channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def auth(self): + # Reading self.channel triggers lazy reinit if the at-fork handler + # cleared it, which keeps _auth consistent with _channel. + if getattr(self, "_channel", None) is None and not self._closing: + self.channel # pylint: disable=pointless-statement + return getattr(self, "_auth", "") + + @auth.setter + def auth(self, value): + self._auth = value def _refresh_channel(self): """ Reset the channel, in the event of an interruption """ - # Close the previous channel - self.channel.close() - # Instantiate a new one - self.channel = salt.channel.client.ReqChannel.factory(self.opts) + old_channel = self._channel + self._channel = None + self._auth = "" + if old_channel is not None: + try: + old_channel.close() + except Exception: # pylint: disable=broad-except + log.debug("Error closing channel during refresh", exc_info=True) return self.channel def _channel_send(self, load, raw=False): @@ -1162,13 +1226,13 @@ def destroy(self): return self._closing = True - channel = None - try: - channel = self.channel - except AttributeError: - pass + channel = self._channel + self._channel = None if channel is not None: - channel.close() + try: + channel.close() + except AttributeError: + pass def get_file( self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None @@ -1507,8 +1571,30 @@ class FSClient(RemoteClient): def __init__(self, opts): # pylint: disable=W0231 Client.__init__(self, opts) # pylint: disable=W0233 self._closing = False - self.channel = salt.fileserver.FSChan(opts) - self.auth = DumbAuth() + self._channel = salt.fileserver.FSChan(opts) + self._auth = DumbAuth() + # Deliberately not added to RemoteClient._instances: FSChan is an + # in-process file server, not a ZMQ socket, and the at-fork handler + # would otherwise wipe self._channel and the channel property below + # would lazily rebuild it as a remote ReqChannel. + + @property + def channel(self): + # FSChan has no fork hazard and we never want to silently swap it + # for a remote ReqChannel via the parent's lazy-rebuild path. + return self._channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def auth(self): + return self._auth + + @auth.setter + def auth(self, value): + self._auth = value # Provide backward compatibility for anyone directly using LocalClient (but no diff --git a/salt/utils/event.py b/salt/utils/event.py index 134ba4a68b5c..508c5b377785 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -58,6 +58,7 @@ import logging import os import time +import weakref from collections.abc import MutableMapping import salt.channel.client @@ -212,6 +213,37 @@ class SaltEvent: The base class used to manage salt events """ + # Live SaltEvent instances tracked weakly so the at-fork handler can + # drop ZMQ pub/push sockets and asyncio/tornado loops inherited by + # any forked child. Sharing a parent's subscriber across sibling + # children races the SUB-side message dispatch and deadlocks the + # asyncio loop the same way RemoteClient does -- see fileclient.py. + _instances = weakref.WeakSet() + _atfork_registered = False + + @classmethod + def _register_atfork(cls): + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + # Drop inherited ZMQ socket / IOLoop references without close(): + # close() would tear down FDs and asyncio loop state copied from + # the parent and could affect the parent's bus. Connections will + # be lazily reopened by connect_pub() / connect_pull() on next + # use. + for inst in list(cls._instances): + try: + inst.subscriber = None + inst.pusher = None + inst.cpub = False + inst.cpush = False + except Exception: # pylint: disable=broad-except + pass + def __init__( self, node, @@ -264,6 +296,8 @@ def __init__( self.pending_tags = [] self.pending_events = [] self.__load_cache_regex() + type(self)._register_atfork() + type(self)._instances.add(self) if listen and not self.cpub: # Only connect to the publisher at initialization time if # we know we want to listen. If we connect to the publisher diff --git a/tests/pytests/unit/test_fork_safety.py b/tests/pytests/unit/test_fork_safety.py new file mode 100644 index 000000000000..5fa4590d465b --- /dev/null +++ b/tests/pytests/unit/test_fork_safety.py @@ -0,0 +1,199 @@ +""" +Deterministic regression tests for the fork-safety fix (issues #68940 +and #65709). + +The bug: ``State.call_parallel()`` forks parallel-state children with +``multiprocessing.Process``. On Linux's default ``fork`` start method +the child inherits the parent's live ZeroMQ REQ socket / tornado IOLoop +held by ``salt.fileclient.RemoteClient``, ``salt.crypt.AsyncAuth`` / +``SAuth`` and ``salt.utils.event.SaltEvent``. Two sibling children +racing the inherited socket deadlock the asyncio loop (~100% CPU, never +completing). + +The fix registers ``os.register_at_fork(after_in_child=...)`` handlers +on those classes that drop the inherited handles in any forked child; +the next use rebuilds them lazily. + +These tests assert that contract directly instead of trying to +reproduce the deadlock. Reproducing the deadlock end to end needs a +real master + ZeroMQ transport and is an inherently flaky "did not +hang" assertion (a green run only means the race did not fire that +time). Salt's in-process functional test harness uses ``FSClient``, +which has no ZeroMQ socket, so it cannot reproduce it at all. Forking +here and checking that the inherited references were cleared is +deterministic, fast, and fails immediately if the at-fork registration +is ever removed or broken. +""" + +import os + +import pytest + +import salt.channel.client +import salt.crypt +import salt.fileclient +import salt.utils.event +from tests.support.mock import MagicMock, patch + +pytestmark = [ + pytest.mark.skip_on_windows( + reason="os.fork / os.register_at_fork are POSIX-only", + ), +] + + +def _run_in_fork(child_verdict): + """Fork, run ``child_verdict`` in the child, and return the child's + exit code to the caller in the parent. + + ``child_verdict`` returns an int: 0 means the post-fork state was + correct, any other value identifies which assertion failed. The + child uses ``os._exit`` so it never runs pytest teardown. The + parent only inspects the exit status, so the result is fully + deterministic and carries no shared state back. + """ + pid = os.fork() + if pid == 0: # pragma: no cover - runs only in the forked child + code = 99 + try: + code = child_verdict() + except BaseException: # pylint: disable=broad-except + code = 98 + finally: + os._exit(code) + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status), "forked child did not exit cleanly" + return os.WEXITSTATUS(status) + + +def test_remoteclient_drops_channel_in_forked_child(minion_opts): + """``RemoteClient`` is the direct #68940 / #65709 path: ``cp.hash_file`` + (file.managed) and ``cp.cache_file`` (cmd.script) both go through its + ZeroMQ REQ channel. After a fork the child must see ``_channel`` and + ``_auth`` cleared so it builds its own channel instead of racing the + parent's socket; the parent must keep its channel untouched. + """ + mock_channel = MagicMock(name="ReqChannel", auth="sentinel-auth") + with patch.object( + salt.channel.client.ReqChannel, "factory", return_value=mock_channel + ): + client = salt.fileclient.RemoteClient(minion_opts) + + # Parent: eager construction installed the channel. + assert client._channel is mock_channel + assert client._auth == "sentinel-auth" + + def child_verdict(): + if client._channel is not None: + return 11 + if client._auth != "": + return 12 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 11: "_channel was inherited by the forked child (not cleared)", + 12: "_auth was inherited by the forked child (not cleared)", + 98: "child raised while checking RemoteClient state", + }.get(code, f"unexpected child exit code {code}") + + # Parent must be untouched by the child's at-fork handler. + assert client._channel is mock_channel + assert client._auth == "sentinel-auth" + + +def test_asyncauth_sauth_clear_singletons_but_keep_creds_in_forked_child(): + """``AsyncAuth``/``SAuth`` singletons are bound to a tornado IOLoop + that cannot cross a fork, so their instance maps must be empty in the + child. ``creds_map`` is deliberately preserved -- AES creds stay + valid after fork and keeping them avoids a re-auth roundtrip; losing + it would itself be a regression of the documented behaviour. + """ + salt.crypt.AsyncAuth._register_atfork() + salt.crypt.SAuth._register_atfork() + + class _Ref: + """Bare ``object()`` is not weakref-able in CPython; a trivial + user-defined class is, which is what the Weak*Dictionary + containers require.""" + + instance_key = _Ref() # weakref-able + hashable + sauth_value = _Ref() # weakref-able value for the WeakValueDictionary + salt.crypt.AsyncAuth.instance_map[instance_key] = "sentinel-instance" + salt.crypt.AsyncAuth.creds_map["fork-test-key"] = "sentinel-creds" + salt.crypt.SAuth.instances["fork-test-key"] = sauth_value + + try: + assert instance_key in salt.crypt.AsyncAuth.instance_map + assert salt.crypt.AsyncAuth.creds_map["fork-test-key"] == "sentinel-creds" + assert "fork-test-key" in salt.crypt.SAuth.instances + + def child_verdict(): + if len(salt.crypt.AsyncAuth.instance_map) != 0: + return 21 + if len(salt.crypt.SAuth.instances) != 0: + return 22 + if salt.crypt.AsyncAuth.creds_map.get("fork-test-key") != "sentinel-creds": + return 23 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 21: "AsyncAuth.instance_map was inherited (not reset) in the child", + 22: "SAuth.instances was inherited (not reset) in the child", + 23: "creds_map was wrongly dropped in the child (should persist)", + 98: "child raised while checking AsyncAuth/SAuth state", + }.get(code, f"unexpected child exit code {code}") + + # Parent keeps everything. + assert instance_key in salt.crypt.AsyncAuth.instance_map + assert salt.crypt.AsyncAuth.creds_map["fork-test-key"] == "sentinel-creds" + assert "fork-test-key" in salt.crypt.SAuth.instances + finally: + salt.crypt.AsyncAuth.instance_map.pop(instance_key, None) + salt.crypt.AsyncAuth.creds_map.pop("fork-test-key", None) + salt.crypt.SAuth.instances.pop("fork-test-key", None) + + +def test_saltevent_drops_sockets_in_forked_child(minion_opts): + """A connected ``SaltEvent`` carries a subscriber/pusher socket and + the ``cpub``/``cpush`` connected flags. After a fork the child must + see all four reset so ``connect_pub``/``connect_pull`` reopen fresh + sockets instead of racing the parent's; the parent must keep its + connection state. + """ + event = salt.utils.event.SaltEvent("minion", opts=minion_opts, listen=False) + + # Simulate an already-connected bus without touching real sockets. + sentinel_sub = object() + sentinel_push = object() + event.subscriber = sentinel_sub + event.pusher = sentinel_push + event.cpub = True + event.cpush = True + + def child_verdict(): + if event.subscriber is not None: + return 31 + if event.pusher is not None: + return 32 + if event.cpub is not False: + return 33 + if event.cpush is not False: + return 34 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 31: "subscriber socket was inherited by the forked child", + 32: "pusher socket was inherited by the forked child", + 33: "cpub flag was inherited by the forked child", + 34: "cpush flag was inherited by the forked child", + 98: "child raised while checking SaltEvent state", + }.get(code, f"unexpected child exit code {code}") + + # Parent must keep its connection state. + assert event.subscriber is sentinel_sub + assert event.pusher is sentinel_push + assert event.cpub is True + assert event.cpush is True