From 1f7f351e0464b2aa24580b961296d4a732cf4ceb Mon Sep 17 00:00:00 2001 From: co-cy Date: Sun, 3 May 2026 17:48:01 -0300 Subject: [PATCH 1/3] Fix #68940: parallel file.managed deadlocks on master file lookups Forked parallel-state children inherited the parent's live ZeroMQ REQ socket via salt.fileclient.RemoteClient. Multiple sibling children calling cp.hash_file (triggered by file.managed with salt:// sources) raced the REQ/REP state machine and deadlocked the asyncio loop with ~98% CPU. Register os.register_at_fork(after_in_child=...) handlers on the three client-side classes that hold long-lived ZMQ handles (RemoteClient, AsyncAuth/SAuth, SaltEvent) so each forked child drops the inherited socket/IOLoop references and lazily rebuilds them on first use. The parent is unaffected. Public API surface (channel, auth, subscriber, pusher) is preserved via property/setter pairs. --- changelog/68940.fixed.md | 9 ++ salt/crypt.py | 37 ++++++ salt/fileclient.py | 120 +++++++++++++++--- salt/utils/event.py | 34 +++++ .../functional/modules/state/test_state.py | 51 ++++++++ 5 files changed, 234 insertions(+), 17 deletions(-) create mode 100644 changelog/68940.fixed.md diff --git a/changelog/68940.fixed.md b/changelog/68940.fixed.md new file mode 100644 index 000000000000..e09d8685b968 --- /dev/null +++ b/changelog/68940.fixed.md @@ -0,0 +1,9 @@ +Fix deadlock in parallel `file.managed` states when source is served by the master. + +Forked parallel-state children previously inherited the parent's ZeroMQ +REQ socket and asyncio loop from `salt.fileclient.RemoteClient`, +`salt.crypt.AsyncAuth/SAuth`, and `salt.utils.event.SaltEvent`. Multiple +sibling children racing those handles deadlocked the asyncio loop with +~98% CPU and never completed. Salt now registers `os.register_at_fork` +handlers on those classes that drop inherited channel/socket references +in any forked child; the next use rebuilds them fresh. diff --git a/salt/crypt.py b/salt/crypt.py index b30ee34849c0..975d73609f91 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -609,10 +609,35 @@ class AsyncAuth: # mapping of key -> creds creds_map = {} + _atfork_registered = False + + @classmethod + def _register_atfork(cls): + # AsyncAuth singletons are bound to a specific tornado IOLoop + # whose state cannot be safely shared across a fork(). Drop the + # singleton map in the child so a fresh AsyncAuth (with a fresh + # io_loop) is created on next use. creds_map is intentionally + # preserved -- AES creds remain valid in the child and reusing + # them avoids a re-auth roundtrip on the first master RPC after + # fork. + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + try: + cls.instance_map = weakref.WeakKeyDictionary() + except Exception: # pylint: disable=broad-except + # Never let an at-fork handler raise. + pass + def __new__(cls, opts, io_loop=None): """ Only create one instance of AsyncAuth per __key() """ + cls._register_atfork() # do we have any mapping for this io_loop io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() if io_loop not in AsyncAuth.instance_map: @@ -1407,10 +1432,22 @@ class SAuth(AsyncAuth): # This class is only a singleton per minion/master pair instances = weakref.WeakValueDictionary() + # SAuth tracks atfork registration independently from AsyncAuth + # because it has its own singleton map that needs clearing. + _atfork_registered = False + + @classmethod + def _after_fork_in_child(cls): + try: + cls.instances = weakref.WeakValueDictionary() + except Exception: # pylint: disable=broad-except + pass + def __new__(cls, opts, io_loop=None): """ Only create one instance of SAuth per __key() """ + cls._register_atfork() key = cls.__key(opts) auth = SAuth.instances.get(key) if auth is None: diff --git a/salt/fileclient.py b/salt/fileclient.py index d3694a316570..9adb23b5742b 100644 --- a/salt/fileclient.py +++ b/salt/fileclient.py @@ -13,6 +13,7 @@ import time import urllib.error import urllib.parse +import weakref import salt.channel.client import salt.client @@ -1126,23 +1127,86 @@ class RemoteClient(Client): Interact with the salt master file server. """ + # Live RemoteClient instances tracked weakly so the at-fork handler can + # drop ZMQ sockets / IOLoop state inherited by any forked child. Using + # a parent's channel from multiple sibling children races the ZMQ + # REQ/REP state machine and deadlocks the asyncio loop. + _instances = weakref.WeakSet() + _atfork_registered = False + def __init__(self, opts): Client.__init__(self, opts) self._closing = False - self.channel = salt.channel.client.ReqChannel.factory(self.opts) - if hasattr(self.channel, "auth"): - self.auth = self.channel.auth - else: - self.auth = "" + # Eager init preserves prior __init__ semantics (existing tests + # expect ReqChannel.factory to be called once at construction time). + # After fork the at-fork handler clears _channel/_auth and the + # property below rebuilds them lazily on next use. + self._channel = salt.channel.client.ReqChannel.factory(self.opts) + self._auth = getattr(self._channel, "auth", "") + type(self)._register_atfork() + type(self)._instances.add(self) + + @classmethod + def _register_atfork(cls): + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + # Drop references to inherited ZMQ sockets and asyncio/tornado + # loops -- they are unsafe to use in a forked child (per ZeroMQ + # guide). We deliberately do NOT call .close() here: SyncWrapper + # close() tears down the IOLoop's FDs which were copied from the + # parent process state and may corrupt unrelated handlers. GC + # will reclaim child-side FD copies; the parent keeps its own. + for inst in list(cls._instances): + try: + inst._channel = None + inst._auth = "" + except Exception: # pylint: disable=broad-except + # Never let an at-fork handler raise -- the child would + # die before any user code could log the failure. + pass + + @property + def channel(self): + channel = getattr(self, "_channel", None) + if channel is None: + channel = salt.channel.client.ReqChannel.factory(self.opts) + self._channel = channel + self._auth = getattr(channel, "auth", "") + return channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def auth(self): + # Reading self.channel triggers lazy reinit if the at-fork handler + # cleared it, which keeps _auth consistent with _channel. + if getattr(self, "_channel", None) is None and not self._closing: + self.channel # pylint: disable=pointless-statement + return getattr(self, "_auth", "") + + @auth.setter + def auth(self, value): + self._auth = value def _refresh_channel(self): """ Reset the channel, in the event of an interruption """ - # Close the previous channel - self.channel.close() - # Instantiate a new one - self.channel = salt.channel.client.ReqChannel.factory(self.opts) + old_channel = self._channel + self._channel = None + self._auth = "" + if old_channel is not None: + try: + old_channel.close() + except Exception: # pylint: disable=broad-except + log.debug("Error closing channel during refresh", exc_info=True) return self.channel def _channel_send(self, load, raw=False): @@ -1162,13 +1226,13 @@ def destroy(self): return self._closing = True - channel = None - try: - channel = self.channel - except AttributeError: - pass + channel = self._channel + self._channel = None if channel is not None: - channel.close() + try: + channel.close() + except AttributeError: + pass def get_file( self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None @@ -1507,8 +1571,30 @@ class FSClient(RemoteClient): def __init__(self, opts): # pylint: disable=W0231 Client.__init__(self, opts) # pylint: disable=W0233 self._closing = False - self.channel = salt.fileserver.FSChan(opts) - self.auth = DumbAuth() + self._channel = salt.fileserver.FSChan(opts) + self._auth = DumbAuth() + # Deliberately not added to RemoteClient._instances: FSChan is an + # in-process file server, not a ZMQ socket, and the at-fork handler + # would otherwise wipe self._channel and the channel property below + # would lazily rebuild it as a remote ReqChannel. + + @property + def channel(self): + # FSChan has no fork hazard and we never want to silently swap it + # for a remote ReqChannel via the parent's lazy-rebuild path. + return self._channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def auth(self): + return self._auth + + @auth.setter + def auth(self, value): + self._auth = value # Provide backward compatibility for anyone directly using LocalClient (but no diff --git a/salt/utils/event.py b/salt/utils/event.py index 134ba4a68b5c..508c5b377785 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -58,6 +58,7 @@ import logging import os import time +import weakref from collections.abc import MutableMapping import salt.channel.client @@ -212,6 +213,37 @@ class SaltEvent: The base class used to manage salt events """ + # Live SaltEvent instances tracked weakly so the at-fork handler can + # drop ZMQ pub/push sockets and asyncio/tornado loops inherited by + # any forked child. Sharing a parent's subscriber across sibling + # children races the SUB-side message dispatch and deadlocks the + # asyncio loop the same way RemoteClient does -- see fileclient.py. + _instances = weakref.WeakSet() + _atfork_registered = False + + @classmethod + def _register_atfork(cls): + if cls._atfork_registered or not hasattr(os, "register_at_fork"): + return + os.register_at_fork(after_in_child=cls._after_fork_in_child) + cls._atfork_registered = True + + @classmethod + def _after_fork_in_child(cls): + # Drop inherited ZMQ socket / IOLoop references without close(): + # close() would tear down FDs and asyncio loop state copied from + # the parent and could affect the parent's bus. Connections will + # be lazily reopened by connect_pub() / connect_pull() on next + # use. + for inst in list(cls._instances): + try: + inst.subscriber = None + inst.pusher = None + inst.cpub = False + inst.cpush = False + except Exception: # pylint: disable=broad-except + pass + def __init__( self, node, @@ -264,6 +296,8 @@ def __init__( self.pending_tags = [] self.pending_events = [] self.__load_cache_regex() + type(self)._register_atfork() + type(self)._instances.add(self) if listen and not self.cpub: # Only connect to the publisher at initialization time if # we know we want to listen. If we connect to the publisher diff --git a/tests/pytests/functional/modules/state/test_state.py b/tests/pytests/functional/modules/state/test_state.py index 78cf00facd39..781800625ac1 100644 --- a/tests/pytests/functional/modules/state/test_state.py +++ b/tests/pytests/functional/modules/state/test_state.py @@ -923,6 +923,57 @@ def test_state_non_base_environment(state, state_tree_prod, tmp_path): assert testfile.exists() +@pytest.mark.skip_on_windows( + reason="Skipped until parallel states can be fixed on Windows" +) +@pytest.mark.timeout(60) +def test_parallel_file_managed_from_master(state, state_tree, tmp_path): + """ + Regression test for parallel file.managed states whose source is + served by the master file server. + + Before the fork-safety fix in salt.fileclient.RemoteClient, two or + more forked children would inherit and race the parent's ZeroMQ REQ + socket on cp.hash_file, causing the asyncio loop in each child to + spin indefinitely (~98% CPU, never completing). With the fix, every + forked child drops the inherited channel via os.register_at_fork() + and lazily opens a fresh one, so concurrent salt:// hash lookups + proceed normally. + + The timeout marker is the key assertion -- without the fix the run + hangs. + """ + dest1 = tmp_path / "dest_one.txt" + dest2 = tmp_path / "dest_two.txt" + + sls_contents = textwrap.dedent( + f""" + managed_one: + file.managed: + - name: {dest1} + - source: salt://parallel_fork/source_one.txt + - parallel: True + + managed_two: + file.managed: + - name: {dest2} + - source: salt://parallel_fork/source_two.txt + - parallel: True + """ + ) + with pytest.helpers.temp_file( + "parallel_fork/source_one.txt", "contents-one\n", state_tree + ), pytest.helpers.temp_file( + "parallel_fork/source_two.txt", "contents-two\n", state_tree + ), pytest.helpers.temp_file( + "parallel_fork.sls", sls_contents, state_tree + ): + ret = state.sls("parallel_fork", __pub_jid="1") + + for state_return in ret: + assert state_return.result is True, state_return.comment + + @pytest.mark.skip_on_windows( reason="Skipped until parallel states can be fixed on Windows" ) From 80f0e472d822d20c79b3565d9b8053e4eb6e8da6 Mon Sep 17 00:00:00 2001 From: co-cy Date: Sun, 17 May 2026 17:48:08 -0300 Subject: [PATCH 2/3] Add parallel cmd.script regression test for #65709 @twangboy asked for a regression test covering the #65709 scenario (`cmd.script` with `parallel: True` hangs at ~100% CPU), which the same fork-safety fix resolves. `cmd.script` reaches the fork-inherited ZeroMQ socket race through a different master file-server call than `file.managed`: it downloads the script via `cp.cache_file` before executing it. Two parallel `cmd.script` states fork two children that race the inherited REQ socket on `cp.cache_file`, deadlocking the asyncio loop. `test_parallel_cmd_script_from_master` mirrors the existing `test_parallel_file_managed_from_master` structure (same `skip_on_windows` + `timeout(60)` markers; the timeout is the real assertion -- without the fix the run never returns) and additionally asserts both scripts actually ran. Added `changelog/65709.fixed.md` so the second issue gets its own release note. Refs: #68940, #65709 --- changelog/65709.fixed.md | 8 +++ .../functional/modules/state/test_state.py | 63 +++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 changelog/65709.fixed.md diff --git a/changelog/65709.fixed.md b/changelog/65709.fixed.md new file mode 100644 index 000000000000..7d072e727874 --- /dev/null +++ b/changelog/65709.fixed.md @@ -0,0 +1,8 @@ +Fix deadlock in parallel `cmd.script` states when the script is served by the master. + +Same fork-inherited ZeroMQ socket race as the `file.managed` fix: a +`cmd.script` state with `parallel: True` downloads the script via +`cp.cache_file` in a forked child that inherited the parent's ZeroMQ +REQ socket, deadlocking the asyncio loop at ~100% CPU. Resolved by the +same `os.register_at_fork` handlers that drop inherited channel/socket +references in forked children. diff --git a/tests/pytests/functional/modules/state/test_state.py b/tests/pytests/functional/modules/state/test_state.py index 781800625ac1..e44239ada43c 100644 --- a/tests/pytests/functional/modules/state/test_state.py +++ b/tests/pytests/functional/modules/state/test_state.py @@ -974,6 +974,69 @@ def test_parallel_file_managed_from_master(state, state_tree, tmp_path): assert state_return.result is True, state_return.comment +@pytest.mark.skip_on_windows( + reason="Skipped until parallel states can be fixed on Windows" +) +@pytest.mark.timeout(60) +def test_parallel_cmd_script_from_master(state, state_tree, tmp_path): + """ + Regression test for https://github.com/saltstack/salt/issues/65709 + (``cmd.script`` with ``parallel: True`` hangs at ~100% CPU). + + This is the same fork-inherited ZeroMQ socket race as the + ``file.managed`` regression above, reached through a different + master file-server call: ``cmd.script`` first downloads the script + via ``cp.cache_file`` before executing it. When two ``cmd.script`` + states run with ``parallel: True``, each forked child inherits the + parent's ZeroMQ REQ socket and the two children race it on + ``cp.cache_file``; the asyncio loop in each child then spins + forever waiting for a reply that was consumed by its sibling. + + With the fork-safety fix every forked child drops the inherited + channel via ``os.register_at_fork()`` and lazily opens a fresh one, + so the concurrent ``salt://`` script fetches complete normally. + + The timeout marker is the key assertion -- without the fix the run + never returns. + """ + out1 = tmp_path / "ran_one.txt" + out2 = tmp_path / "ran_two.txt" + + script_one = f'#!/bin/sh\necho one > "{out1}"\nexit 0\n' + script_two = f'#!/bin/sh\necho two > "{out2}"\nexit 0\n' + + sls_contents = textwrap.dedent( + """ + script_one: + cmd.script: + - source: salt://parallel_fork_script/one.sh + - name: one.sh + - shell: /bin/sh + - parallel: True + + script_two: + cmd.script: + - source: salt://parallel_fork_script/two.sh + - name: two.sh + - shell: /bin/sh + - parallel: True + """ + ) + with pytest.helpers.temp_file( + "parallel_fork_script/one.sh", script_one, state_tree + ), pytest.helpers.temp_file( + "parallel_fork_script/two.sh", script_two, state_tree + ), pytest.helpers.temp_file( + "parallel_fork_script.sls", sls_contents, state_tree + ): + ret = state.sls("parallel_fork_script", __pub_jid="1") + + for state_return in ret: + assert state_return.result is True, state_return.comment + assert out1.read_text().strip() == "one" + assert out2.read_text().strip() == "two" + + @pytest.mark.skip_on_windows( reason="Skipped until parallel states can be fixed on Windows" ) From 82ff79c259d737ff741e49f2fa3597d9b6f0b9e4 Mon Sep 17 00:00:00 2001 From: co-cy Date: Sun, 17 May 2026 18:30:10 -0300 Subject: [PATCH 3/3] Replace weak functional parity tests with deterministic fork-contract tests The two functional tests (test_parallel_file_managed_from_master and test_parallel_cmd_script_from_master) do not actually guard the regression: Salt's in-process functional harness uses FSClient, which has no ZeroMQ socket, so the fork-inherited-socket deadlock cannot occur there. Verified locally -- both tests pass with AND without the fix, so they would not catch a regression. Replace them with tests/pytests/unit/test_fork_safety.py, which asserts the fix's actual contract: after os.fork(), the at-fork handler must clear the inherited handles in the child while the parent keeps its own. - test_remoteclient_drops_channel_in_forked_child: the direct #68940/#65709 path (cp.hash_file / cp.cache_file go through RemoteClient's REQ channel). Child sees _channel/_auth cleared. - test_asyncauth_sauth_clear_singletons_but_keep_creds_in_forked_child: AsyncAuth.instance_map / SAuth.instances reset in child; creds_map deliberately preserved (documented behaviour). - test_saltevent_drops_sockets_in_forked_child: subscriber/pusher None and cpub/cpush False in child. These are deterministic and fast (no master daemon, no ZeroMQ, no race): each fails with the fix reverted and passes with it in place, so they are real regression guards. Verified both directions locally. Refs: #68940, #65709 --- .../functional/modules/state/test_state.py | 114 ---------- tests/pytests/unit/test_fork_safety.py | 199 ++++++++++++++++++ 2 files changed, 199 insertions(+), 114 deletions(-) create mode 100644 tests/pytests/unit/test_fork_safety.py diff --git a/tests/pytests/functional/modules/state/test_state.py b/tests/pytests/functional/modules/state/test_state.py index e44239ada43c..78cf00facd39 100644 --- a/tests/pytests/functional/modules/state/test_state.py +++ b/tests/pytests/functional/modules/state/test_state.py @@ -923,120 +923,6 @@ def test_state_non_base_environment(state, state_tree_prod, tmp_path): assert testfile.exists() -@pytest.mark.skip_on_windows( - reason="Skipped until parallel states can be fixed on Windows" -) -@pytest.mark.timeout(60) -def test_parallel_file_managed_from_master(state, state_tree, tmp_path): - """ - Regression test for parallel file.managed states whose source is - served by the master file server. - - Before the fork-safety fix in salt.fileclient.RemoteClient, two or - more forked children would inherit and race the parent's ZeroMQ REQ - socket on cp.hash_file, causing the asyncio loop in each child to - spin indefinitely (~98% CPU, never completing). With the fix, every - forked child drops the inherited channel via os.register_at_fork() - and lazily opens a fresh one, so concurrent salt:// hash lookups - proceed normally. - - The timeout marker is the key assertion -- without the fix the run - hangs. - """ - dest1 = tmp_path / "dest_one.txt" - dest2 = tmp_path / "dest_two.txt" - - sls_contents = textwrap.dedent( - f""" - managed_one: - file.managed: - - name: {dest1} - - source: salt://parallel_fork/source_one.txt - - parallel: True - - managed_two: - file.managed: - - name: {dest2} - - source: salt://parallel_fork/source_two.txt - - parallel: True - """ - ) - with pytest.helpers.temp_file( - "parallel_fork/source_one.txt", "contents-one\n", state_tree - ), pytest.helpers.temp_file( - "parallel_fork/source_two.txt", "contents-two\n", state_tree - ), pytest.helpers.temp_file( - "parallel_fork.sls", sls_contents, state_tree - ): - ret = state.sls("parallel_fork", __pub_jid="1") - - for state_return in ret: - assert state_return.result is True, state_return.comment - - -@pytest.mark.skip_on_windows( - reason="Skipped until parallel states can be fixed on Windows" -) -@pytest.mark.timeout(60) -def test_parallel_cmd_script_from_master(state, state_tree, tmp_path): - """ - Regression test for https://github.com/saltstack/salt/issues/65709 - (``cmd.script`` with ``parallel: True`` hangs at ~100% CPU). - - This is the same fork-inherited ZeroMQ socket race as the - ``file.managed`` regression above, reached through a different - master file-server call: ``cmd.script`` first downloads the script - via ``cp.cache_file`` before executing it. When two ``cmd.script`` - states run with ``parallel: True``, each forked child inherits the - parent's ZeroMQ REQ socket and the two children race it on - ``cp.cache_file``; the asyncio loop in each child then spins - forever waiting for a reply that was consumed by its sibling. - - With the fork-safety fix every forked child drops the inherited - channel via ``os.register_at_fork()`` and lazily opens a fresh one, - so the concurrent ``salt://`` script fetches complete normally. - - The timeout marker is the key assertion -- without the fix the run - never returns. - """ - out1 = tmp_path / "ran_one.txt" - out2 = tmp_path / "ran_two.txt" - - script_one = f'#!/bin/sh\necho one > "{out1}"\nexit 0\n' - script_two = f'#!/bin/sh\necho two > "{out2}"\nexit 0\n' - - sls_contents = textwrap.dedent( - """ - script_one: - cmd.script: - - source: salt://parallel_fork_script/one.sh - - name: one.sh - - shell: /bin/sh - - parallel: True - - script_two: - cmd.script: - - source: salt://parallel_fork_script/two.sh - - name: two.sh - - shell: /bin/sh - - parallel: True - """ - ) - with pytest.helpers.temp_file( - "parallel_fork_script/one.sh", script_one, state_tree - ), pytest.helpers.temp_file( - "parallel_fork_script/two.sh", script_two, state_tree - ), pytest.helpers.temp_file( - "parallel_fork_script.sls", sls_contents, state_tree - ): - ret = state.sls("parallel_fork_script", __pub_jid="1") - - for state_return in ret: - assert state_return.result is True, state_return.comment - assert out1.read_text().strip() == "one" - assert out2.read_text().strip() == "two" - - @pytest.mark.skip_on_windows( reason="Skipped until parallel states can be fixed on Windows" ) diff --git a/tests/pytests/unit/test_fork_safety.py b/tests/pytests/unit/test_fork_safety.py new file mode 100644 index 000000000000..5fa4590d465b --- /dev/null +++ b/tests/pytests/unit/test_fork_safety.py @@ -0,0 +1,199 @@ +""" +Deterministic regression tests for the fork-safety fix (issues #68940 +and #65709). + +The bug: ``State.call_parallel()`` forks parallel-state children with +``multiprocessing.Process``. On Linux's default ``fork`` start method +the child inherits the parent's live ZeroMQ REQ socket / tornado IOLoop +held by ``salt.fileclient.RemoteClient``, ``salt.crypt.AsyncAuth`` / +``SAuth`` and ``salt.utils.event.SaltEvent``. Two sibling children +racing the inherited socket deadlock the asyncio loop (~100% CPU, never +completing). + +The fix registers ``os.register_at_fork(after_in_child=...)`` handlers +on those classes that drop the inherited handles in any forked child; +the next use rebuilds them lazily. + +These tests assert that contract directly instead of trying to +reproduce the deadlock. Reproducing the deadlock end to end needs a +real master + ZeroMQ transport and is an inherently flaky "did not +hang" assertion (a green run only means the race did not fire that +time). Salt's in-process functional test harness uses ``FSClient``, +which has no ZeroMQ socket, so it cannot reproduce it at all. Forking +here and checking that the inherited references were cleared is +deterministic, fast, and fails immediately if the at-fork registration +is ever removed or broken. +""" + +import os + +import pytest + +import salt.channel.client +import salt.crypt +import salt.fileclient +import salt.utils.event +from tests.support.mock import MagicMock, patch + +pytestmark = [ + pytest.mark.skip_on_windows( + reason="os.fork / os.register_at_fork are POSIX-only", + ), +] + + +def _run_in_fork(child_verdict): + """Fork, run ``child_verdict`` in the child, and return the child's + exit code to the caller in the parent. + + ``child_verdict`` returns an int: 0 means the post-fork state was + correct, any other value identifies which assertion failed. The + child uses ``os._exit`` so it never runs pytest teardown. The + parent only inspects the exit status, so the result is fully + deterministic and carries no shared state back. + """ + pid = os.fork() + if pid == 0: # pragma: no cover - runs only in the forked child + code = 99 + try: + code = child_verdict() + except BaseException: # pylint: disable=broad-except + code = 98 + finally: + os._exit(code) + _, status = os.waitpid(pid, 0) + assert os.WIFEXITED(status), "forked child did not exit cleanly" + return os.WEXITSTATUS(status) + + +def test_remoteclient_drops_channel_in_forked_child(minion_opts): + """``RemoteClient`` is the direct #68940 / #65709 path: ``cp.hash_file`` + (file.managed) and ``cp.cache_file`` (cmd.script) both go through its + ZeroMQ REQ channel. After a fork the child must see ``_channel`` and + ``_auth`` cleared so it builds its own channel instead of racing the + parent's socket; the parent must keep its channel untouched. + """ + mock_channel = MagicMock(name="ReqChannel", auth="sentinel-auth") + with patch.object( + salt.channel.client.ReqChannel, "factory", return_value=mock_channel + ): + client = salt.fileclient.RemoteClient(minion_opts) + + # Parent: eager construction installed the channel. + assert client._channel is mock_channel + assert client._auth == "sentinel-auth" + + def child_verdict(): + if client._channel is not None: + return 11 + if client._auth != "": + return 12 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 11: "_channel was inherited by the forked child (not cleared)", + 12: "_auth was inherited by the forked child (not cleared)", + 98: "child raised while checking RemoteClient state", + }.get(code, f"unexpected child exit code {code}") + + # Parent must be untouched by the child's at-fork handler. + assert client._channel is mock_channel + assert client._auth == "sentinel-auth" + + +def test_asyncauth_sauth_clear_singletons_but_keep_creds_in_forked_child(): + """``AsyncAuth``/``SAuth`` singletons are bound to a tornado IOLoop + that cannot cross a fork, so their instance maps must be empty in the + child. ``creds_map`` is deliberately preserved -- AES creds stay + valid after fork and keeping them avoids a re-auth roundtrip; losing + it would itself be a regression of the documented behaviour. + """ + salt.crypt.AsyncAuth._register_atfork() + salt.crypt.SAuth._register_atfork() + + class _Ref: + """Bare ``object()`` is not weakref-able in CPython; a trivial + user-defined class is, which is what the Weak*Dictionary + containers require.""" + + instance_key = _Ref() # weakref-able + hashable + sauth_value = _Ref() # weakref-able value for the WeakValueDictionary + salt.crypt.AsyncAuth.instance_map[instance_key] = "sentinel-instance" + salt.crypt.AsyncAuth.creds_map["fork-test-key"] = "sentinel-creds" + salt.crypt.SAuth.instances["fork-test-key"] = sauth_value + + try: + assert instance_key in salt.crypt.AsyncAuth.instance_map + assert salt.crypt.AsyncAuth.creds_map["fork-test-key"] == "sentinel-creds" + assert "fork-test-key" in salt.crypt.SAuth.instances + + def child_verdict(): + if len(salt.crypt.AsyncAuth.instance_map) != 0: + return 21 + if len(salt.crypt.SAuth.instances) != 0: + return 22 + if salt.crypt.AsyncAuth.creds_map.get("fork-test-key") != "sentinel-creds": + return 23 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 21: "AsyncAuth.instance_map was inherited (not reset) in the child", + 22: "SAuth.instances was inherited (not reset) in the child", + 23: "creds_map was wrongly dropped in the child (should persist)", + 98: "child raised while checking AsyncAuth/SAuth state", + }.get(code, f"unexpected child exit code {code}") + + # Parent keeps everything. + assert instance_key in salt.crypt.AsyncAuth.instance_map + assert salt.crypt.AsyncAuth.creds_map["fork-test-key"] == "sentinel-creds" + assert "fork-test-key" in salt.crypt.SAuth.instances + finally: + salt.crypt.AsyncAuth.instance_map.pop(instance_key, None) + salt.crypt.AsyncAuth.creds_map.pop("fork-test-key", None) + salt.crypt.SAuth.instances.pop("fork-test-key", None) + + +def test_saltevent_drops_sockets_in_forked_child(minion_opts): + """A connected ``SaltEvent`` carries a subscriber/pusher socket and + the ``cpub``/``cpush`` connected flags. After a fork the child must + see all four reset so ``connect_pub``/``connect_pull`` reopen fresh + sockets instead of racing the parent's; the parent must keep its + connection state. + """ + event = salt.utils.event.SaltEvent("minion", opts=minion_opts, listen=False) + + # Simulate an already-connected bus without touching real sockets. + sentinel_sub = object() + sentinel_push = object() + event.subscriber = sentinel_sub + event.pusher = sentinel_push + event.cpub = True + event.cpush = True + + def child_verdict(): + if event.subscriber is not None: + return 31 + if event.pusher is not None: + return 32 + if event.cpub is not False: + return 33 + if event.cpush is not False: + return 34 + return 0 + + code = _run_in_fork(child_verdict) + assert code == 0, { + 31: "subscriber socket was inherited by the forked child", + 32: "pusher socket was inherited by the forked child", + 33: "cpub flag was inherited by the forked child", + 34: "cpush flag was inherited by the forked child", + 98: "child raised while checking SaltEvent state", + }.get(code, f"unexpected child exit code {code}") + + # Parent must keep its connection state. + assert event.subscriber is sentinel_sub + assert event.pusher is sentinel_push + assert event.cpub is True + assert event.cpush is True