Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/69418.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed `salt -b` (sync batch mode) failing with `SaltClientError: Some exception handling minion payload` when the salt-master runs as a non-root user (e.g. `salt`). The sync CLI batch driver had been writing batch-state persistence files (`.batch.p`, `batch_active.p`) under the master's `cachedir` from the CLI process — pre-creating the JID directory with root ownership and tripping a `PermissionError` in `local_cache.prep_jid` on the master.

The sync CLI driver no longer writes anything under the master's `cachedir` itself. Instead it ships every state transition to the master-side `BatchManager` as `salt/batch/<jid>/{new,progress,complete,halted}` events; the manager — already running as the master daemon's user — persists `.batch.p` and maintains the active-batch index on the CLI's behalf. `salt-run batch.status <jid>`, `salt-run batch.list_active`, and `salt-run batch.stop <jid>` now work for sync batches in the same deployment shape (non-root master, root CLI) where the original feature was broken. Event-bus failures degrade gracefully: the batch still completes, just without visibility from the runner commands.
194 changes: 181 additions & 13 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,28 @@ def run(self):
self.opts, self.minions, batch_jid, driver="cli"
)

salt.utils.batch_state.write_batch_state(
batch_jid, state, self.opts, best_effort=True
)
salt.utils.batch_state.add_to_active_index(
batch_jid, self.opts, best_effort=True
# The sync CLI driver does not write under the master's
# ``cachedir`` itself. ``cachedir`` is owned by the master
# daemon's user (typically ``salt``); the CLI is normally
# invoked as ``root``, so any direct write would pre-create
# the JID directory with the wrong ownership and trip a
# ``PermissionError`` in ``local_cache.prep_jid`` when the
# master tries to write the ``jid`` file (issue #69418).
#
# Instead, we ship every state change to the master-side
# ``BatchManager`` via ``salt/batch/<jid>/{new,progress,
# complete,halted}`` events. The manager — already running
# as the master daemon's user — persists ``.batch.p`` and
# maintains the active-batch index on the CLI's behalf.
# ``batch.status`` / ``batch.list_active`` / ``batch.stop``
# see sync batches because of that handoff. All event ops
# are best-effort: if the master event bus is unreachable
# the CLI batch still completes correctly with no visibility.
self._fire_event(
salt.utils.batch_output.state_payload(state),
salt.utils.batch_output.tag_new(batch_jid),
)
self._subscribe_to_halt(batch_jid)

output = salt.utils.batch_output.CLIOutput(self.opts, quiet=self.quiet)
for down_minion in self.down_minions:
Expand Down Expand Up @@ -196,11 +212,34 @@ def run(self):
)
self._discover_late_minions(state)

# Observe halt requests from the master-side
# ``batch.stop`` runner before deciding what to do
# this tick. The runner fires ``salt/batch/<jid>/
# stop`` which the BatchManager translates into
# ``salt/batch/<jid>/halted``; we subscribed to the
# halted tag during startup.
if self._consume_halt_event(batch_jid, state):
# progress_batch already short-circuits on a
# halted state, but we still want to fall
# through the existing failhard reporting path.
pass

now = time.time()
action = salt.utils.batch_state.progress_batch(
state, new_returns, now=now, timed_out=timed_out
)

if (
action.publish
or action.finished_minions
or action.timed_out_minions
or state["halted"]
):
self._fire_event(
salt.utils.batch_output.state_payload(state),
salt.utils.batch_output.tag_progress(batch_jid),
)

if action.publish:
output.on_batch_start(action.publish)
args = [
Expand Down Expand Up @@ -259,10 +298,6 @@ def run(self):
yield {minion_id: {}}, 0
output.on_minion_timeout(minion_id)

salt.utils.batch_state.write_batch_state(
batch_jid, state, self.opts, best_effort=True
)

# Prune finished iterators; progress_batch already
# cleared their minions from state["active"].
iters = [
Expand All @@ -285,12 +320,31 @@ def run(self):

output.on_batch_done(state)
finally:
salt.utils.batch_state.remove_from_active_index(
batch_jid, self.opts, best_effort=True
terminal_tag = (
salt.utils.batch_output.tag_halted(batch_jid)
if state.get("halted")
else salt.utils.batch_output.tag_complete(batch_jid)
)
salt.utils.batch_state.write_batch_state(
batch_jid, state, self.opts, best_effort=True
self._fire_event(
salt.utils.batch_output.state_payload(state),
terminal_tag,
)
self._unsubscribe_from_halt(batch_jid)
# Tear the event handle down explicitly **before**
# ``LocalClient.destroy``. The new visibility code lazily
# creates a ``SyncWrapper(ipc_publish_server)`` (and its
# nested ``SyncWrapper(PubServerClient)``) the first time
# we ``fire_event``; each wrapper owns its own asyncio
# loop. ``LocalClient.destroy`` will close them, but
# leaving that to the implicit teardown means the
# asyncio cleanup races interpreter shutdown — on Python
# 3.14 / Windows that race drops post-``shutdown_asyncgens``
# Handles from ``_ready`` before they're awaited and
# spills ``RuntimeWarning: coroutine ... was never awaited``
# onto the CLI's stderr. Calling ``event.destroy`` here
# (while we still control the loop) plus a deterministic
# drain quiesces those warnings at the source.
self._teardown_event_handle()
self.local.destroy()

def _poll_iterators(self, iters, minion_tracker, raw_mode, raw_by_minion):
Expand Down Expand Up @@ -375,3 +429,117 @@ def _discover_late_minions(self, state):
state["pending"].append(minion_id)
if minion_id not in self.minions:
self.minions.append(minion_id)

# ------------------------------------------------------------------
# Event-bus glue — best-effort visibility for ``batch.status`` /
# ``batch.list_active`` / ``batch.stop``. Every method here
# swallows its own errors so a broken or absent event bus never
# blocks the run; the worst case is "no visibility into the
# batch from master-side runners," same as on 3007.x.
# ------------------------------------------------------------------

def _event(self):
"""Return the master event handle attached to our LocalClient."""
return getattr(self.local, "event", None)

def _fire_event(self, payload, tag):
"""Fire a batch lifecycle event; never raise."""
event = self._event()
if event is None:
return
try:
event.fire_event(payload, tag)
except Exception: # pylint: disable=broad-except
log.debug("Failed to fire %s; continuing without it", tag, exc_info=True)

def _subscribe_to_halt(self, jid):
"""Subscribe to ``salt/batch/<jid>/halted`` so we observe stops."""
event = self._event()
if event is None:
return
try:
event.subscribe(
salt.utils.batch_output.tag_halted(jid), match_type="startswith"
)
except Exception: # pylint: disable=broad-except
log.debug(
"Failed to subscribe to halted tag for %s; "
"batch.stop will not be observable from this CLI",
jid,
exc_info=True,
)

def _unsubscribe_from_halt(self, jid):
"""Counterpart to ``_subscribe_to_halt``."""
event = self._event()
if event is None:
return
try:
event.unsubscribe(
salt.utils.batch_output.tag_halted(jid), match_type="startswith"
)
except Exception: # pylint: disable=broad-except
log.debug(
"Failed to unsubscribe from halted tag for %s", jid, exc_info=True
)

def _teardown_event_handle(self):
"""
Destroy the LocalClient's event handle in-place.

Safe to call on a half-initialized or already-destroyed
client. Any failure is swallowed: the worst case is that
``LocalClient.destroy`` cleans up instead, and the
Python 3.14 / Windows teardown warning resurfaces — never
a functional regression.

After ``event.destroy``, both wrappers' asyncio loops have
been closed; a follow-on ``LocalClient.destroy`` call is a
no-op (``SaltEvent.destroy`` is idempotent — it only acts
when ``subscriber`` / ``pusher`` are still set).
"""
local = getattr(self, "local", None)
if local is None:
return
event = getattr(local, "event", None)
if event is None:
return
try:
event.destroy()
except Exception: # pylint: disable=broad-except
log.debug(
"Failed to tear down event handle cleanly; deferring "
"to LocalClient.destroy",
exc_info=True,
)

def _consume_halt_event(self, jid, state):
"""
Non-blocking poll for ``salt/batch/<jid>/halted``.

Returns ``True`` when a halt event was observed (and mutates
*state* in place to record it); ``False`` otherwise. Spurious
bus failures degrade silently to ``False``.
"""
event = self._event()
if event is None:
return False
try:
payload = event.get_event(
wait=0,
tag=salt.utils.batch_output.tag_halted(jid),
match_type="startswith",
no_block=True,
)
except Exception: # pylint: disable=broad-except
log.debug("Failed to poll halted tag for %s", jid, exc_info=True)
return False
if not isinstance(payload, dict):
return False
if payload.get("jid") != jid:
# Stray event (or a mock returning truthy garbage in
# tests). Ignore — only an explicit match counts.
return False
state["halted"] = True
state["halted_reason"] = payload.get("reason") or "stop"
return True
91 changes: 68 additions & 23 deletions salt/utils/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ def _populate_async_methods(self):
def __repr__(self):
return f"<SyncWrapper(cls={self.cls})"

@staticmethod
def _loop_can_run_until_complete(loop):
"""
Return ``True`` iff ``loop.run_until_complete(coro)`` can drive a
freshly created coroutine to completion without immediately raising.

``BaseEventLoop.run_until_complete`` raises ``RuntimeError`` if the
loop is closed or already running, but only *after* its
``future`` argument has been evaluated. Constructing the
coroutine without being able to await it leaks it through
``coroutine.__del__`` as ``RuntimeWarning: coroutine '...' was
never awaited`` (see ``close()``). We avoid that by inspecting
the loop's state up front.
"""
if loop is None:
return False
if loop.is_closed():
return False
if loop.is_running():
return False
return True

def close(self):
for method in self._close_methods:
if method in self._async_methods:
Expand All @@ -134,33 +156,56 @@ def close(self):
)
# Shut down asyncio resources before closing the IOLoop so file descriptors
# held by pending tasks, async generators, and the default executor are released.
#
# Each of the three ``run_until_complete`` calls below takes a freshly
# constructed coroutine object as its argument. If the loop is already
# closed (or running) at that point ``run_until_complete`` raises
# ``RuntimeError`` *after* the coroutine has been created but *before*
# ``ensure_future`` wraps it — and the bare coroutine object is then
# garbage-collected unawaited, emitting a
# ``RuntimeWarning: coroutine '...' was never awaited`` on stderr. On
# Python 3.14 / Windows the batch CLI integration tests
# (``tests/pytests/integration/cli/test_batch.py::test_batch_retcode``
# and ``test_multiple_modules_in_batch``) gate on ``assert not
# cmd.stderr`` and turn that warning into a hard failure.
#
# Gate every call on ``not _loop_can_run_until_complete(loop)`` so we
# never even *construct* the inner coroutine when the loop can't drive
# it to completion.
try:
pending_tasks = [
task for task in asyncio.all_tasks(self.asyncio_loop) if not task.done()
]
if pending_tasks:
for task in pending_tasks:
task.cancel()
if self._loop_can_run_until_complete(self.asyncio_loop):
pending_tasks = [
task
for task in asyncio.all_tasks(self.asyncio_loop)
if not task.done()
]
if pending_tasks:
for task in pending_tasks:
task.cancel()
gathered = asyncio.gather(*pending_tasks, return_exceptions=True)
try:
self.asyncio_loop.run_until_complete(gathered)
except Exception: # pylint: disable=broad-except
# ``gathered`` is a Future; if run_until_complete bailed
# part-way we still need to make sure the Future is
# consumed so its exception (if any) isn't logged as
# unhandled. Tasks already cancelled above.
if not gathered.done():
gathered.cancel()

if self._loop_can_run_until_complete(self.asyncio_loop):
shutdown_agens = self.asyncio_loop.shutdown_asyncgens()
try:
self.asyncio_loop.run_until_complete(
asyncio.gather(*pending_tasks, return_exceptions=True)
)
self.asyncio_loop.run_until_complete(shutdown_agens)
except Exception: # pylint: disable=broad-except
pass

try:
self.asyncio_loop.run_until_complete(
self.asyncio_loop.shutdown_asyncgens()
)
except Exception: # pylint: disable=broad-except
pass
shutdown_agens.close()

try:
self.asyncio_loop.run_until_complete(
self.asyncio_loop.shutdown_default_executor()
)
except Exception: # pylint: disable=broad-except
pass
if self._loop_can_run_until_complete(self.asyncio_loop):
shutdown_exec = self.asyncio_loop.shutdown_default_executor()
try:
self.asyncio_loop.run_until_complete(shutdown_exec)
except Exception: # pylint: disable=broad-except
shutdown_exec.close()

except Exception as exc: # pylint: disable=broad-except
log.error("Error during asyncio shutdown: %s", exc)
Expand Down
Loading
Loading