Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/65709.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Fix deadlock in parallel `cmd.script` states when the script is served by the master.

Same fork-inherited ZeroMQ socket race as the `file.managed` fix: a
`cmd.script` state with `parallel: True` downloads the script via
`cp.cache_file` in a forked child that inherited the parent's ZeroMQ
REQ socket, deadlocking the asyncio loop at ~100% CPU. Resolved by the
same `os.register_at_fork` handlers that drop inherited channel/socket
references in forked children.
9 changes: 9 additions & 0 deletions changelog/68940.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Fix deadlock in parallel `file.managed` states when source is served by the master.

Forked parallel-state children previously inherited the parent's ZeroMQ
REQ socket and asyncio loop from `salt.fileclient.RemoteClient`,
`salt.crypt.AsyncAuth/SAuth`, and `salt.utils.event.SaltEvent`. Multiple
sibling children racing those handles deadlocked the asyncio loop with
~98% CPU and never completed. Salt now registers `os.register_at_fork`
handlers on those classes that drop inherited channel/socket references
in any forked child; the next use rebuilds them fresh.
37 changes: 37 additions & 0 deletions salt/crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,35 @@ class AsyncAuth:
# mapping of key -> creds
creds_map = {}

_atfork_registered = False

@classmethod
def _register_atfork(cls):
# AsyncAuth singletons are bound to a specific tornado IOLoop
# whose state cannot be safely shared across a fork(). Drop the
# singleton map in the child so a fresh AsyncAuth (with a fresh
# io_loop) is created on next use. creds_map is intentionally
# preserved -- AES creds remain valid in the child and reusing
# them avoids a re-auth roundtrip on the first master RPC after
# fork.
if cls._atfork_registered or not hasattr(os, "register_at_fork"):
return
os.register_at_fork(after_in_child=cls._after_fork_in_child)
cls._atfork_registered = True

@classmethod
def _after_fork_in_child(cls):
try:
cls.instance_map = weakref.WeakKeyDictionary()
except Exception: # pylint: disable=broad-except
# Never let an at-fork handler raise.
pass

def __new__(cls, opts, io_loop=None):
"""
Only create one instance of AsyncAuth per __key()
"""
cls._register_atfork()
# do we have any mapping for this io_loop
io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
if io_loop not in AsyncAuth.instance_map:
Expand Down Expand Up @@ -1407,10 +1432,22 @@ class SAuth(AsyncAuth):
# This class is only a singleton per minion/master pair
instances = weakref.WeakValueDictionary()

# SAuth tracks atfork registration independently from AsyncAuth
# because it has its own singleton map that needs clearing.
_atfork_registered = False

@classmethod
def _after_fork_in_child(cls):
try:
cls.instances = weakref.WeakValueDictionary()
except Exception: # pylint: disable=broad-except
pass

def __new__(cls, opts, io_loop=None):
"""
Only create one instance of SAuth per __key()
"""
cls._register_atfork()
key = cls.__key(opts)
auth = SAuth.instances.get(key)
if auth is None:
Expand Down
120 changes: 103 additions & 17 deletions salt/fileclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import time
import urllib.error
import urllib.parse
import weakref

import salt.channel.client
import salt.client
Expand Down Expand Up @@ -1126,23 +1127,86 @@ class RemoteClient(Client):
Interact with the salt master file server.
"""

# Live RemoteClient instances tracked weakly so the at-fork handler can
# drop ZMQ sockets / IOLoop state inherited by any forked child. Using
# a parent's channel from multiple sibling children races the ZMQ
# REQ/REP state machine and deadlocks the asyncio loop.
_instances = weakref.WeakSet()
_atfork_registered = False

def __init__(self, opts):
Client.__init__(self, opts)
self._closing = False
self.channel = salt.channel.client.ReqChannel.factory(self.opts)
if hasattr(self.channel, "auth"):
self.auth = self.channel.auth
else:
self.auth = ""
# Eager init preserves prior __init__ semantics (existing tests
# expect ReqChannel.factory to be called once at construction time).
# After fork the at-fork handler clears _channel/_auth and the
# property below rebuilds them lazily on next use.
self._channel = salt.channel.client.ReqChannel.factory(self.opts)
self._auth = getattr(self._channel, "auth", "")
type(self)._register_atfork()
type(self)._instances.add(self)

@classmethod
def _register_atfork(cls):
if cls._atfork_registered or not hasattr(os, "register_at_fork"):
return
os.register_at_fork(after_in_child=cls._after_fork_in_child)
cls._atfork_registered = True

@classmethod
def _after_fork_in_child(cls):
# Drop references to inherited ZMQ sockets and asyncio/tornado
# loops -- they are unsafe to use in a forked child (per ZeroMQ
# guide). We deliberately do NOT call .close() here: SyncWrapper
# close() tears down the IOLoop's FDs which were copied from the
# parent process state and may corrupt unrelated handlers. GC
# will reclaim child-side FD copies; the parent keeps its own.
for inst in list(cls._instances):
try:
inst._channel = None
inst._auth = ""
except Exception: # pylint: disable=broad-except
# Never let an at-fork handler raise -- the child would
# die before any user code could log the failure.
pass

@property
def channel(self):
channel = getattr(self, "_channel", None)
if channel is None:
channel = salt.channel.client.ReqChannel.factory(self.opts)
self._channel = channel
self._auth = getattr(channel, "auth", "")
return channel

@channel.setter
def channel(self, value):
self._channel = value

@property
def auth(self):
# Reading self.channel triggers lazy reinit if the at-fork handler
# cleared it, which keeps _auth consistent with _channel.
if getattr(self, "_channel", None) is None and not self._closing:
self.channel # pylint: disable=pointless-statement
return getattr(self, "_auth", "")

@auth.setter
def auth(self, value):
self._auth = value

def _refresh_channel(self):
"""
Reset the channel, in the event of an interruption
"""
# Close the previous channel
self.channel.close()
# Instantiate a new one
self.channel = salt.channel.client.ReqChannel.factory(self.opts)
old_channel = self._channel
self._channel = None
self._auth = ""
if old_channel is not None:
try:
old_channel.close()
except Exception: # pylint: disable=broad-except
log.debug("Error closing channel during refresh", exc_info=True)
return self.channel

def _channel_send(self, load, raw=False):
Expand All @@ -1162,13 +1226,13 @@ def destroy(self):
return

self._closing = True
channel = None
try:
channel = self.channel
except AttributeError:
pass
channel = self._channel
self._channel = None
if channel is not None:
channel.close()
try:
channel.close()
except AttributeError:
pass

def get_file(
self, path, dest="", makedirs=False, saltenv="base", gzip=None, cachedir=None
Expand Down Expand Up @@ -1507,8 +1571,30 @@ class FSClient(RemoteClient):
def __init__(self, opts): # pylint: disable=W0231
Client.__init__(self, opts) # pylint: disable=W0233
self._closing = False
self.channel = salt.fileserver.FSChan(opts)
self.auth = DumbAuth()
self._channel = salt.fileserver.FSChan(opts)
self._auth = DumbAuth()
# Deliberately not added to RemoteClient._instances: FSChan is an
# in-process file server, not a ZMQ socket, and the at-fork handler
# would otherwise wipe self._channel and the channel property below
# would lazily rebuild it as a remote ReqChannel.

@property
def channel(self):
# FSChan has no fork hazard and we never want to silently swap it
# for a remote ReqChannel via the parent's lazy-rebuild path.
return self._channel

@channel.setter
def channel(self, value):
self._channel = value

@property
def auth(self):
return self._auth

@auth.setter
def auth(self, value):
self._auth = value


# Provide backward compatibility for anyone directly using LocalClient (but no
Expand Down
34 changes: 34 additions & 0 deletions salt/utils/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import logging
import os
import time
import weakref
from collections.abc import MutableMapping

import salt.channel.client
Expand Down Expand Up @@ -212,6 +213,37 @@ class SaltEvent:
The base class used to manage salt events
"""

# Live SaltEvent instances tracked weakly so the at-fork handler can
# drop ZMQ pub/push sockets and asyncio/tornado loops inherited by
# any forked child. Sharing a parent's subscriber across sibling
# children races the SUB-side message dispatch and deadlocks the
# asyncio loop the same way RemoteClient does -- see fileclient.py.
_instances = weakref.WeakSet()
_atfork_registered = False

@classmethod
def _register_atfork(cls):
if cls._atfork_registered or not hasattr(os, "register_at_fork"):
return
os.register_at_fork(after_in_child=cls._after_fork_in_child)
cls._atfork_registered = True

@classmethod
def _after_fork_in_child(cls):
# Drop inherited ZMQ socket / IOLoop references without close():
# close() would tear down FDs and asyncio loop state copied from
# the parent and could affect the parent's bus. Connections will
# be lazily reopened by connect_pub() / connect_pull() on next
# use.
for inst in list(cls._instances):
try:
inst.subscriber = None
inst.pusher = None
inst.cpub = False
inst.cpush = False
except Exception: # pylint: disable=broad-except
pass

def __init__(
self,
node,
Expand Down Expand Up @@ -264,6 +296,8 @@ def __init__(
self.pending_tags = []
self.pending_events = []
self.__load_cache_regex()
type(self)._register_atfork()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incredibly suspect. AI Slop?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dwoz, thanks for the review.
type(self)._register_atfork() is used intentionally here to resolve the actual class of the instance at runtime, which matters if a subclass ever overrides _register_atfork or maintains its own _atfork_registered flag. Using type(self) over a hardcoded class name preserves correct behavior under inheritance, and over self._register_atfork() it makes the class-level nature of the call explicit at the call site (and avoids accidental shadowing by an instance attribute).
That said, I agree the idiom is uncommon and can look odd at a glance. Happy to change it — a few alternatives:

self._register_atfork() — shortest, relies on the descriptor protocol to bind the correct class. Fine in practice.
self.class._register_atfork() — equivalent to type(self), slightly more conventional in some codebases.
Move registration out of init entirely — e.g. into init_subclass or module-level init — so the hook is registered once per class instead of on every instantiation. This is probably the cleanest fix architecturally, since os.register_at_fork only needs to be called once.

Let me know which direction you'd prefer and I'll update the PR.

It is important to me that the product is improved. I'm new here, if you have any general practices, I'd like to get acquainted with them.

type(self)._instances.add(self)
if listen and not self.cpub:
# Only connect to the publisher at initialization time if
# we know we want to listen. If we connect to the publisher
Expand Down
Loading
Loading