From f5074f41e90bb5b493b034d10d305e4724642adb Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 12 Jun 2026 17:18:36 -0700 Subject: [PATCH 1/7] Fix sync batch mode failing under non-root salt-master The 3008.x batch refactor introduced a regression where the sync CLI batch driver writes batch-state persistence files (``.batch.p``, ``batch_active.p``) under the master's ``cachedir`` from the CLI process. When the salt CLI is invoked as ``root`` against a master running as user ``salt`` (the packaging default), the persistence writes pre-create the JID directory with root ownership, which trips a ``PermissionError`` in ``local_cache.prep_jid`` when the master subsequently tries to write the ``jid`` file. The cascading failure surfaces to the user as ``SaltClientError: Some exception handling minion payload``. ``BatchManager`` only ever acts on ``driver="master"`` batch state (``_handle_new``/``_handle_recover`` explicitly ignore ``driver="cli"``), so the sync CLI's persistence calls served no functional purpose for any consumer of the on-disk batch state. Remove them: ``write_batch_state``, ``add_to_active_index``, and ``remove_from_active_index`` are no longer called from the sync CLI driver. Add a code comment at the deletion site explaining why. Update two parity tests in ``test_batch_parity.py`` to capture the driver state via ``progress_batch`` instead of ``write_batch_state`` (same assertions, new capture mechanism). Add a regression test in ``test_batch.py`` that asserts ``Batch.run()`` calls neither ``write_batch_state`` nor ``add_to_active_index`` / ``remove_from_active_index``. Fixes #69418 --- changelog/69418.fixed.md | 1 + salt/cli/batch.py | 29 ++++++------- tests/pytests/unit/cli/test_batch.py | 46 +++++++++++++++++++++ tests/pytests/unit/cli/test_batch_parity.py | 28 ++++++++++--- 4 files changed, 81 insertions(+), 23 deletions(-) create mode 100644 changelog/69418.fixed.md diff --git a/changelog/69418.fixed.md b/changelog/69418.fixed.md new file mode 100644 index 000000000000..f85b12ca04e1 --- /dev/null +++ b/changelog/69418.fixed.md @@ -0,0 +1 @@ +Fixed `salt -b` (sync batch mode) failing with `SaltClientError: Some exception handling minion payload` when the salt-master runs as a non-root user (e.g. `salt`). The sync CLI batch driver was writing batch-state persistence files (`.batch.p`, `batch_active.p`) under the master's `cachedir` from the CLI process — pre-creating the JID directory with root ownership and tripping a `PermissionError` in `local_cache.prep_jid` on the master. The sync driver no longer touches those files; only the async batch path and `BatchManager` (which already run with the master daemon's user) write them. diff --git a/salt/cli/batch.py b/salt/cli/batch.py index ff02bb59a754..a78dab5f6988 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -158,13 +158,18 @@ def run(self): state = salt.utils.batch_state.create_batch_state( self.opts, self.minions, batch_jid, driver="cli" ) - - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True - ) - salt.utils.batch_state.add_to_active_index( - batch_jid, self.opts, best_effort=True - ) + # The sync CLI driver intentionally does not persist + # ``.batch.p`` or update ``batch_active.p``. Those files live + # under the master's ``cachedir`` and are owned by the master + # daemon's user (typically ``salt``). Writing them from the + # CLI process — which is normally invoked as ``root`` — + # pre-creates the JID directory with root ownership and trips + # a ``PermissionError`` when the master subsequently tries to + # write the ``jid`` file via ``local_cache.prep_jid``. See + # issue #69418. ``BatchManager`` only acts on + # ``driver="master"`` state anyway, so removing these calls + # has no observable behavior change for any consumer of the + # batch state files. output = salt.utils.batch_output.CLIOutput(self.opts, quiet=self.quiet) for down_minion in self.down_minions: @@ -259,10 +264,6 @@ def run(self): yield {minion_id: {}}, 0 output.on_minion_timeout(minion_id) - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True - ) - # Prune finished iterators; progress_batch already # cleared their minions from state["active"]. iters = [ @@ -285,12 +286,6 @@ def run(self): output.on_batch_done(state) finally: - salt.utils.batch_state.remove_from_active_index( - batch_jid, self.opts, best_effort=True - ) - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True - ) self.local.destroy() def _poll_iterators(self, iters, minion_tracker, raw_mode, raw_by_minion): diff --git a/tests/pytests/unit/cli/test_batch.py b/tests/pytests/unit/cli/test_batch.py index 1e90219e4236..128c3195956e 100644 --- a/tests/pytests/unit/cli/test_batch.py +++ b/tests/pytests/unit/cli/test_batch.py @@ -545,3 +545,49 @@ def _fake_sleep(duration): # dispatch. Without batch_wait, the second dispatch would happen # immediately and no 0.02s sleeps would be issued. assert spin_sleeps[0] >= 1 + + +def test_run_does_not_write_to_master_cachedir(batch): + """ + Regression test for issue #69418. + + The sync CLI batch driver must not write batch-state persistence + files (``.batch.p``, ``batch_active.p``) under the master's + ``cachedir`` from the CLI process. The salt-master daemon is the + sole owner of that directory tree. When the CLI is invoked as + ``root`` against a master running as user ``salt``, any + CLI-initiated write (or directory creation) under the master's + cachedir produces root-owned files that the master cannot + subsequently update — which surfaces as a ``PermissionError`` in + ``local_cache.prep_jid`` and ultimately as + ``SaltClientError: Some exception handling minion payload`` on + the user's terminal. + + Assert that ``write_batch_state`` and ``add_to_active_index`` / + ``remove_from_active_index`` are not called by ``Batch.run()``. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with patch("salt.utils.batch_state.write_batch_state") as write_state, patch( + "salt.utils.batch_state.add_to_active_index" + ) as add_index, patch( + "salt.utils.batch_state.remove_from_active_index" + ) as remove_index: + list(Batch.run(batch)) + + write_state.assert_not_called() + add_index.assert_not_called() + remove_index.assert_not_called() diff --git a/tests/pytests/unit/cli/test_batch_parity.py b/tests/pytests/unit/cli/test_batch_parity.py index 2c3a6e3bc075..9d9ed3ea44cf 100644 --- a/tests/pytests/unit/cli/test_batch_parity.py +++ b/tests/pytests/unit/cli/test_batch_parity.py @@ -162,13 +162,21 @@ def _mock_iter(*args, **kwargs): batch.local.cmd_iter_no_block = MagicMock(side_effect=_mock_iter) + # The sync CLI driver no longer persists ``.batch.p`` (see issue + # #69418), so we capture the post-tick state by wrapping + # ``progress_batch`` in the ``salt.cli.batch`` namespace. persisted = {} + real_progress_batch = progress_batch - def _capture_state(jid, state, opts, *, best_effort=False): + def _capture_progress(state, *args, **kwargs): + action = real_progress_batch(state, *args, **kwargs) persisted["last"] = dict(state) - return True + return action - with patch("salt.utils.batch_state.write_batch_state", side_effect=_capture_state): + with patch( + "salt.cli.batch.salt.utils.batch_state.progress_batch", + side_effect=_capture_progress, + ): list(Batch.run(batch)) driver_state = persisted["last"] @@ -205,13 +213,21 @@ def _empty_iter(*args, **kwargs): batch.local.cmd_iter_no_block = MagicMock(side_effect=_empty_iter) + # See ``test_sync_driver_matches_progress_batch_oracle`` for why + # we capture the driver state via ``progress_batch`` rather than + # ``write_batch_state`` (issue #69418). persisted = {} + real_progress_batch = progress_batch - def _capture_state(jid, state, opts, *, best_effort=False): + def _capture_progress(state, *args, **kwargs): + action = real_progress_batch(state, *args, **kwargs) persisted["last"] = dict(state) - return True + return action - with patch("salt.utils.batch_state.write_batch_state", side_effect=_capture_state): + with patch( + "salt.cli.batch.salt.utils.batch_state.progress_batch", + side_effect=_capture_progress, + ): yields = list(Batch.run(batch)) assert persisted["last"]["failed"] == {"m1": "timeout", "m2": "timeout"} From fcb4d49af161ac8c5d4b4cf11da30fcead3eb567 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 12 Jun 2026 19:18:38 -0700 Subject: [PATCH 2/7] Restore sync batch visibility through the event bus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #69418's first commit removed the sync CLI driver's direct writes under the master's ``cachedir`` to fix the ``PermissionError`` that bricked ``salt -b`` whenever the master ran as a non-root user. That fix was correct for the regression but came at the cost of dropping the visibility feature added in PR #68964 — ``salt-run batch.list_active`` / ``batch.status`` / ``batch.stop`` could no longer see sync batches. This commit replaces the broken in-process disk writes with an event-bus handoff that keeps every cachedir write on the master daemon's side of the trust boundary: * ``Batch.run()`` fires ``salt/batch//{new,progress,complete, halted}`` with the full ``BatchState`` embedded in the payload (new helper ``salt.utils.batch_output.state_payload``). All fire / subscribe / poll ops are best-effort — a missing master event bus degrades to "no visibility, batch still works." * ``Batch.run()`` subscribes to ``salt/batch//halted`` for the run's duration and polls non-blocking each loop iteration so a ``salt-run batch.stop`` request actually halts the run. * ``BatchManager._handle_new`` learns to read the embedded state from the event data; for ``driver="cli"`` it persists ``.batch.p`` + the active index but does *not* drive the state machine (the CLI owns that). * New ``_handle_progress`` and ``_handle_terminal`` update the on-disk state for sync CLI batches as the run progresses and clear the active-index entry on completion. * ``_tick`` and ``_progress_one`` defensively skip ``driver="cli"`` JIDs so a stale index entry can never trigger a spurious re-publish or false timeout of in-flight minions. Tests: * New ``tests/pytests/unit/cli/test_batch_visibility.py`` wires ``Batch.run`` to a real ``BatchManager`` via a synchronous fire_event bridge and asserts the end-to-end visibility contract — ``batch.list_active`` and ``batch.status`` see the running sync batch, ``batch.stop`` halts it, the post-run active index is empty. * New unit tests in ``test_batch.py`` cover the event flow, halt-subscription lifecycle, halt observation, and best-effort failure handling. * New ``BatchManager`` tests cover CLI registration without adoption, ``_tick`` filtering, ``_handle_progress`` / ``_handle_terminal`` semantics, and the extended event dispatch. Full batch test suite: 167 passed. Fixes #69418 --- changelog/69418.fixed.md | 4 +- salt/cli/batch.py | 152 +++++++- salt/utils/batch_manager.py | 145 ++++++- salt/utils/batch_output.py | 28 ++ tests/pytests/unit/cli/test_batch.py | 155 ++++++++ .../pytests/unit/cli/test_batch_visibility.py | 357 ++++++++++++++++++ .../pytests/unit/utils/test_batch_manager.py | 182 ++++++++- 7 files changed, 989 insertions(+), 34 deletions(-) create mode 100644 tests/pytests/unit/cli/test_batch_visibility.py diff --git a/changelog/69418.fixed.md b/changelog/69418.fixed.md index f85b12ca04e1..6e84b197ae37 100644 --- a/changelog/69418.fixed.md +++ b/changelog/69418.fixed.md @@ -1 +1,3 @@ -Fixed `salt -b` (sync batch mode) failing with `SaltClientError: Some exception handling minion payload` when the salt-master runs as a non-root user (e.g. `salt`). The sync CLI batch driver was writing batch-state persistence files (`.batch.p`, `batch_active.p`) under the master's `cachedir` from the CLI process — pre-creating the JID directory with root ownership and tripping a `PermissionError` in `local_cache.prep_jid` on the master. The sync driver no longer touches those files; only the async batch path and `BatchManager` (which already run with the master daemon's user) write them. +Fixed `salt -b` (sync batch mode) failing with `SaltClientError: Some exception handling minion payload` when the salt-master runs as a non-root user (e.g. `salt`). The sync CLI batch driver had been writing batch-state persistence files (`.batch.p`, `batch_active.p`) under the master's `cachedir` from the CLI process — pre-creating the JID directory with root ownership and tripping a `PermissionError` in `local_cache.prep_jid` on the master. + +The sync CLI driver no longer writes anything under the master's `cachedir` itself. Instead it ships every state transition to the master-side `BatchManager` as `salt/batch//{new,progress,complete,halted}` events; the manager — already running as the master daemon's user — persists `.batch.p` and maintains the active-batch index on the CLI's behalf. `salt-run batch.status `, `salt-run batch.list_active`, and `salt-run batch.stop ` now work for sync batches in the same deployment shape (non-root master, root CLI) where the original feature was broken. Event-bus failures degrade gracefully: the batch still completes, just without visibility from the runner commands. diff --git a/salt/cli/batch.py b/salt/cli/batch.py index a78dab5f6988..53989f2929cc 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -158,18 +158,29 @@ def run(self): state = salt.utils.batch_state.create_batch_state( self.opts, self.minions, batch_jid, driver="cli" ) - # The sync CLI driver intentionally does not persist - # ``.batch.p`` or update ``batch_active.p``. Those files live - # under the master's ``cachedir`` and are owned by the master - # daemon's user (typically ``salt``). Writing them from the - # CLI process — which is normally invoked as ``root`` — - # pre-creates the JID directory with root ownership and trips - # a ``PermissionError`` when the master subsequently tries to - # write the ``jid`` file via ``local_cache.prep_jid``. See - # issue #69418. ``BatchManager`` only acts on - # ``driver="master"`` state anyway, so removing these calls - # has no observable behavior change for any consumer of the - # batch state files. + + # The sync CLI driver does not write under the master's + # ``cachedir`` itself. ``cachedir`` is owned by the master + # daemon's user (typically ``salt``); the CLI is normally + # invoked as ``root``, so any direct write would pre-create + # the JID directory with the wrong ownership and trip a + # ``PermissionError`` in ``local_cache.prep_jid`` when the + # master tries to write the ``jid`` file (issue #69418). + # + # Instead, we ship every state change to the master-side + # ``BatchManager`` via ``salt/batch//{new,progress, + # complete,halted}`` events. The manager — already running + # as the master daemon's user — persists ``.batch.p`` and + # maintains the active-batch index on the CLI's behalf. + # ``batch.status`` / ``batch.list_active`` / ``batch.stop`` + # see sync batches because of that handoff. All event ops + # are best-effort: if the master event bus is unreachable + # the CLI batch still completes correctly with no visibility. + self._fire_event( + salt.utils.batch_output.state_payload(state), + salt.utils.batch_output.tag_new(batch_jid), + ) + self._subscribe_to_halt(batch_jid) output = salt.utils.batch_output.CLIOutput(self.opts, quiet=self.quiet) for down_minion in self.down_minions: @@ -201,11 +212,34 @@ def run(self): ) self._discover_late_minions(state) + # Observe halt requests from the master-side + # ``batch.stop`` runner before deciding what to do + # this tick. The runner fires ``salt/batch// + # stop`` which the BatchManager translates into + # ``salt/batch//halted``; we subscribed to the + # halted tag during startup. + if self._consume_halt_event(batch_jid, state): + # progress_batch already short-circuits on a + # halted state, but we still want to fall + # through the existing failhard reporting path. + pass + now = time.time() action = salt.utils.batch_state.progress_batch( state, new_returns, now=now, timed_out=timed_out ) + if ( + action.publish + or action.finished_minions + or action.timed_out_minions + or state["halted"] + ): + self._fire_event( + salt.utils.batch_output.state_payload(state), + salt.utils.batch_output.tag_progress(batch_jid), + ) + if action.publish: output.on_batch_start(action.publish) args = [ @@ -286,6 +320,16 @@ def run(self): output.on_batch_done(state) finally: + terminal_tag = ( + salt.utils.batch_output.tag_halted(batch_jid) + if state.get("halted") + else salt.utils.batch_output.tag_complete(batch_jid) + ) + self._fire_event( + salt.utils.batch_output.state_payload(state), + terminal_tag, + ) + self._unsubscribe_from_halt(batch_jid) self.local.destroy() def _poll_iterators(self, iters, minion_tracker, raw_mode, raw_by_minion): @@ -370,3 +414,87 @@ def _discover_late_minions(self, state): state["pending"].append(minion_id) if minion_id not in self.minions: self.minions.append(minion_id) + + # ------------------------------------------------------------------ + # Event-bus glue — best-effort visibility for ``batch.status`` / + # ``batch.list_active`` / ``batch.stop``. Every method here + # swallows its own errors so a broken or absent event bus never + # blocks the run; the worst case is "no visibility into the + # batch from master-side runners," same as on 3007.x. + # ------------------------------------------------------------------ + + def _event(self): + """Return the master event handle attached to our LocalClient.""" + return getattr(self.local, "event", None) + + def _fire_event(self, payload, tag): + """Fire a batch lifecycle event; never raise.""" + event = self._event() + if event is None: + return + try: + event.fire_event(payload, tag) + except Exception: # pylint: disable=broad-except + log.debug("Failed to fire %s; continuing without it", tag, exc_info=True) + + def _subscribe_to_halt(self, jid): + """Subscribe to ``salt/batch//halted`` so we observe stops.""" + event = self._event() + if event is None: + return + try: + event.subscribe( + salt.utils.batch_output.tag_halted(jid), match_type="startswith" + ) + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to subscribe to halted tag for %s; " + "batch.stop will not be observable from this CLI", + jid, + exc_info=True, + ) + + def _unsubscribe_from_halt(self, jid): + """Counterpart to ``_subscribe_to_halt``.""" + event = self._event() + if event is None: + return + try: + event.unsubscribe( + salt.utils.batch_output.tag_halted(jid), match_type="startswith" + ) + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to unsubscribe from halted tag for %s", jid, exc_info=True + ) + + def _consume_halt_event(self, jid, state): + """ + Non-blocking poll for ``salt/batch//halted``. + + Returns ``True`` when a halt event was observed (and mutates + *state* in place to record it); ``False`` otherwise. Spurious + bus failures degrade silently to ``False``. + """ + event = self._event() + if event is None: + return False + try: + payload = event.get_event( + wait=0, + tag=salt.utils.batch_output.tag_halted(jid), + match_type="startswith", + no_block=True, + ) + except Exception: # pylint: disable=broad-except + log.debug("Failed to poll halted tag for %s", jid, exc_info=True) + return False + if not isinstance(payload, dict): + return False + if payload.get("jid") != jid: + # Stray event (or a mock returning truthy garbage in + # tests). Ignore — only an explicit match counts. + return False + state["halted"] = True + state["halted_reason"] = payload.get("reason") or "stop" + return True diff --git a/salt/utils/batch_manager.py b/salt/utils/batch_manager.py index a47947faee6d..c0216913fc79 100644 --- a/salt/utils/batch_manager.py +++ b/salt/utils/batch_manager.py @@ -33,7 +33,9 @@ log = logging.getLogger(__name__) -_BATCH_LIFECYCLE_TAG = re.compile(r"^salt/batch/([^/]+)/(new|stop|recover)$") +_BATCH_LIFECYCLE_TAG = re.compile( + r"^salt/batch/([^/]+)/(new|stop|recover|progress|complete|halted)$" +) _JOB_RETURN_TAG = re.compile(r"^salt/job/([^/]+)/ret/([^/]+)$") DEFAULT_LOOP_INTERVAL = 5 @@ -153,11 +155,15 @@ def _handle_event(self, event): if lifecycle: jid, action = lifecycle.group(1), lifecycle.group(2) if action == "new": - self._handle_new(jid) + self._handle_new(jid, data) elif action == "stop": self._handle_stop(jid, data) elif action == "recover": self._handle_recover(jid) + elif action == "progress": + self._handle_progress(jid, data) + elif action in ("complete", "halted"): + self._handle_terminal(jid, data, action) return ret = _JOB_RETURN_TAG.match(tag) @@ -170,28 +176,66 @@ def _handle_event(self, event): # Handlers # ------------------------------------------------------------------ - def _handle_new(self, jid): + def _handle_new(self, jid, data=None): """ - Adopt a new async batch. - - The CLI writes ``.batch.p`` before firing ``salt/batch//new``, - so this handler reads the on-disk state to confirm and registers - the JID in the in-memory active set. Sync batches - (``driver="cli"``) are ignored. + Register a new batch. + + Two paths: + + * **Async** (``driver="master"``) — the async CLI path writes + ``.batch.p`` itself (it runs ``run_job`` first, so the + master has already created the JID directory with the right + ownership) and then fires ``salt/batch//new`` with no + ``state`` field. This handler reads from disk to confirm + and adopts the JID into the in-memory active set. + * **Sync CLI** (``driver="cli"``) — the sync CLI cannot + safely write under the master's ``cachedir`` (see issue + #69418), so it ships the full state in the event data + under ``data["state"]``. This handler persists that state + to ``.batch.p``, registers the JID in the active index, + but does **not** add it to ``self.active_batches`` — the + CLI process owns the iterator and driving the state + machine. The manager only acts as a visibility layer for + the ``batch.status`` / ``batch.list_active`` runners and as + the recipient of ``batch.stop`` requests, which it + translates into ``salt/batch//halted`` events the CLI + observes. """ - if jid in self.active_batches: - return - state = salt.utils.batch_state.read_batch_state(jid, self.opts) + if data is None: + data = {} + state = data.get("state") + if state is not None: + # Trust the event payload over any stale on-disk state — + # the sync CLI ships its in-memory state with every + # lifecycle event. + state = dict(state) + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + else: + state = salt.utils.batch_state.read_batch_state(jid, self.opts) if state is None: log.warning("salt/batch/%s/new received but .batch.p is not readable", jid) return - if state.get("driver") != "master": + driver = state.get("driver") + if driver == "cli": + # Persistence-only adoption: keep the JID in the on-disk + # active index so ``batch.list_active`` sees it, but do + # not drive the state machine — the CLI is doing that. + salt.utils.batch_state.add_to_active_index(jid, self.opts) + log.info( + "Registered sync CLI batch %s on behalf of %s", + jid, + state.get("user"), + ) + return + if driver != "master": log.debug( "Ignoring salt/batch/%s/new — driver=%s is not master-driven", jid, - state.get("driver"), + driver, ) return + if jid in self.active_batches: + return self._adopt(jid) log.info("Adopted async batch %s on behalf of %s", jid, state.get("user")) @@ -240,6 +284,49 @@ def _handle_recover(self, jid): self._adopt(jid) self._progress_one(jid, {}) + def _handle_progress(self, jid, data): + """ + Sync CLI driver progress update. + + The sync CLI fires ``salt/batch//progress`` after every + ``progress_batch()`` step, embedding the post-tick state under + ``data["state"]``. The manager just persists it so + ``batch.status`` reflects the latest snapshot. + + Master-driven (``driver="master"``) progress is internal to + the manager and never arrives here. + """ + state = data.get("state") if isinstance(data, dict) else None + if state is None: + return + state = dict(state) + if state.get("driver") != "cli": + return + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + + def _handle_terminal(self, jid, data, action): + """ + Sync CLI driver teardown. + + Fired as ``salt/batch//complete`` (normal drain) or + ``salt/batch//halted`` (failhard / external stop). The + manager persists the final state and removes the JID from + the active index so ``batch.list_active`` stops listing it. + + The CLI itself fires the lifecycle event on the bus; we do + not re-emit it here (the manager would otherwise be + subscribed to its own emission, which would loop). + """ + del action # for symmetry with _handle_stop; future use + state = data.get("state") if isinstance(data, dict) else None + if state is None: + return + state = dict(state) + if state.get("driver") != "cli": + return + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + salt.utils.batch_state.remove_from_active_index(jid, self.opts) + def _handle_batch_return(self, jid, minion_id, data): """ Process a minion return event for an active batch. @@ -258,6 +345,11 @@ def _progress_one(self, jid, new_returns, now=None): """ Read ``.batch.p``, advance the state machine, persist the result, publish any new sub-batch, and fire progress events. + + Sync CLI batches (``driver="cli"``) are never advanced here + — their state machine is owned by the CLI process. This + method short-circuits if it's called for one anyway (e.g. + defensive callers, or a leftover entry in the active index). """ state = salt.utils.batch_state.read_batch_state(jid, self.opts) if state is None: @@ -267,6 +359,8 @@ def _progress_one(self, jid, new_returns, now=None): ) self._retire(jid) return + if state.get("driver") == "cli": + return if state.get("halted"): self._retire(jid) return @@ -294,11 +388,15 @@ def _tick(self, now=None): Three things happen, in order: 1. Reconcile the in-memory active set with the on-disk index. - Batches registered by other processes (the CLI at - batch-creation time, the ``batch.stop`` runner, a previously - crashed manager) can be adopted without requiring an - event. This closes the race where a ``salt/batch//new`` - event is lost before we're listening. + Master-driven batches registered by other processes (the + async CLI at batch-creation time, the ``batch.stop`` + runner, a previously crashed manager) can be adopted + without requiring an event. This closes the race where a + ``salt/batch//new`` event is lost before we're + listening. Sync CLI batches (``driver="cli"``) are kept + in the on-disk index for visibility but are never added + to the in-memory active set — they're driven by the CLI + process, not by us. 2. Advance each active batch by one tick — drives timeout detection and ``batch_wait`` expiry when no return events are arriving. @@ -307,6 +405,15 @@ def _tick(self, now=None): """ on_disk = salt.utils.batch_state.read_active_index(self.opts) for jid in on_disk - self.active_batches: + state = salt.utils.batch_state.read_batch_state(jid, self.opts) + if state is None: + # No state file for an indexed JID. Drop the stale + # index entry; Maintenance would otherwise prune it + # eventually but the tick can do it now for free. + salt.utils.batch_state.remove_from_active_index(jid, self.opts) + continue + if state.get("driver") != "master": + continue log.info( "BatchManager adopting batch %s discovered via active index", jid, diff --git a/salt/utils/batch_output.py b/salt/utils/batch_output.py index 76cf23e2719b..5cba9af0a00e 100644 --- a/salt/utils/batch_output.py +++ b/salt/utils/batch_output.py @@ -32,6 +32,7 @@ _BATCH_COMPLETE = "salt/batch/{jid}/complete" _BATCH_HALTED = "salt/batch/{jid}/halted" _BATCH_RECOVER = "salt/batch/{jid}/recover" +_BATCH_STOP = "salt/batch/{jid}/stop" def tag_new(jid): @@ -59,6 +60,11 @@ def tag_recover(jid): return _BATCH_RECOVER.format(jid=jid) +def tag_stop(jid): + """Tag fired by the ``batch.stop`` runner to request a halt.""" + return _BATCH_STOP.format(jid=jid) + + # --------------------------------------------------------------------------- # Payload builders — keep event bodies consistent between drivers. # --------------------------------------------------------------------------- @@ -89,6 +95,28 @@ def new_payload(state): return payload +def state_payload(state): + """ + Payload that embeds the full ``BatchState`` dict under ``state``. + + Used by the sync CLI driver to ship its in-memory state to the + master-side ``BatchManager`` so the manager can persist + ``.batch.p`` on the CLI's behalf. The CLI process — typically + running as ``root`` while the master runs as ``salt`` — must not + write under the master's ``cachedir`` directly (see issue + #69418); shipping the state in the event keeps all FS writes on + the master daemon's side of the trust boundary. + """ + payload = _base_payload(state) + payload.update( + { + "driver": state.get("driver"), + "state": dict(state), + } + ) + return payload + + def progress_payload(state, iteration=None): """Payload for ``salt/batch//progress``.""" payload = _base_payload(state) diff --git a/tests/pytests/unit/cli/test_batch.py b/tests/pytests/unit/cli/test_batch.py index 128c3195956e..a50af52dc4f8 100644 --- a/tests/pytests/unit/cli/test_batch.py +++ b/tests/pytests/unit/cli/test_batch.py @@ -591,3 +591,158 @@ def _make_iter(*args, **kwargs): write_state.assert_not_called() add_index.assert_not_called() remove_index.assert_not_called() + + +def test_run_fires_new_progress_and_complete_events(batch): + """ + The sync CLI batch driver replaces direct disk writes with + event-bus emissions to the master-side ``BatchManager``. Every + state transition gets a ``salt/batch//{new,progress,complete, + halted}`` event carrying the full state in ``data["state"]``; + the manager persists ``.batch.p`` / updates the active index on + the CLI's behalf so ``batch.list_active`` and ``batch.status`` + have something to read. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + # Halted-event poll must return None for the batch to actually + # run (MagicMock's default truthy return would short-circuit the + # loop into the halt path). + batch.local.event.get_event = MagicMock(return_value=None) + + list(Batch.run(batch)) + + fire_calls = batch.local.event.fire_event.call_args_list + tags = [c.args[1] for c in fire_calls] + assert any(t.startswith("salt/batch/") and t.endswith("/new") for t in tags) + assert any(t.startswith("salt/batch/") and t.endswith("/progress") for t in tags) + assert any(t.startswith("salt/batch/") and t.endswith("/complete") for t in tags) + # /halted only fires when the run halts. + assert not any(t.endswith("/halted") for t in tags) + + # Every payload should embed the full state dict so the manager + # can persist it without having to read its own disk. + new_call = next(c for c in fire_calls if c.args[1].endswith("/new")) + assert "state" in new_call.args[0] + assert new_call.args[0]["state"]["driver"] == "cli" + + +def test_run_subscribes_and_unsubscribes_to_halted(batch): + """ + The CLI subscribes to ``salt/batch//halted`` for the run's + duration so it can react to a ``batch.stop`` request from the + runner; on teardown it unsubscribes. + """ + batch.opts = { + "batch": "100%", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + batch.local.event.get_event = MagicMock(return_value=None) + + list(Batch.run(batch)) + + sub_args = batch.local.event.subscribe.call_args.args + unsub_args = batch.local.event.unsubscribe.call_args.args + assert sub_args[0].endswith("/halted") + assert unsub_args[0].endswith("/halted") + # Same tag both ways. + assert sub_args[0] == unsub_args[0] + + +def test_run_halts_when_stop_event_arrives(batch): + """ + When ``batch.stop `` is invoked, the master-side + ``BatchManager._handle_stop`` fires ``salt/batch//halted``. + The CLI polls for that tag every loop iteration and halts the + run when one arrives — and the teardown event must be the + ``/halted`` variant, not ``/complete``. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2", "m3", "m4"], [], []]) + + # Iterator that never returns; the run can only end via halt. + batch.local.cmd_iter_no_block = MagicMock(side_effect=lambda *a, **k: iter([None])) + + # First poll → no halt; second poll → halt event for our jid. + poll_box = {"n": 0, "captured_jid": None} + + def _capture_jid(payload, tag): + if tag.endswith("/new"): + poll_box["captured_jid"] = payload["jid"] + + batch.local.event.fire_event = MagicMock(side_effect=_capture_jid) + + def _get_event(*args, **kwargs): + poll_box["n"] += 1 + if poll_box["n"] < 2: + return None + return {"jid": poll_box["captured_jid"], "reason": "stop"} + + batch.local.event.get_event = MagicMock(side_effect=_get_event) + + list(Batch.run(batch)) + + fire_calls = batch.local.event.fire_event.call_args_list + tags = [c.args[1] for c in fire_calls] + assert any(t.endswith("/halted") for t in tags) + assert not any(t.endswith("/complete") for t in tags) + + +def test_run_event_failures_are_swallowed(batch): + """ + The CLI batch must remain functional even when the master event + bus is broken or unreachable: every fire / subscribe / poll path + must catch and log instead of propagating. This preserves the + 3007.x behavior of "no visibility but the batch still works." + """ + batch.opts = { + "batch": "100%", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + batch.local.event.fire_event = MagicMock(side_effect=OSError("bus down")) + batch.local.event.subscribe = MagicMock(side_effect=OSError("bus down")) + batch.local.event.unsubscribe = MagicMock(side_effect=OSError("bus down")) + batch.local.event.get_event = MagicMock(side_effect=OSError("bus down")) + + # Should not raise; should yield m1's return. + results = list(Batch.run(batch)) + assert len(results) == 1 + assert next(iter(results[0][0].values())) is True diff --git a/tests/pytests/unit/cli/test_batch_visibility.py b/tests/pytests/unit/cli/test_batch_visibility.py new file mode 100644 index 000000000000..4c4ead74c2ad --- /dev/null +++ b/tests/pytests/unit/cli/test_batch_visibility.py @@ -0,0 +1,357 @@ +""" +Wired-up tests for sync CLI batch visibility via the master-side +``BatchManager`` (issue #69418). + +These tests bridge ``Batch.run()`` and a real ``BatchManager`` by +routing the CLI's ``self.local.event.fire_event`` calls into the +manager's ``_handle_event`` directly, instead of through a real master +event bus. That lets us prove the end-to-end visibility contract +without a forked master process: + +* While the sync CLI batch is running, ``batch.list_active`` / + ``batch.status`` see the batch via the manager-written + ``.batch.p`` + active index. +* When ``batch.stop`` is invoked, the manager fires + ``salt/batch//halted`` and the CLI observes it and exits. +* Master-as-``salt``-user / CLI-as-``root`` style deployments + cannot regress: the CLI itself never writes under the master's + cachedir, asserted by patching out the persistence helpers. +""" + +import pytest + +# Initialize ``__opts__`` on the runner module the way the loader +# would at runtime. This module-import-time setup keeps ``patch.dict`` +# from having to ``create=True`` (which trips the salt-loader-dunder +# pylint plugin). +import salt.runners.batch as _batch_runner_module # noqa: E402 +import salt.utils.batch_output +import salt.utils.batch_state +from salt.cli.batch import Batch +from salt.runners import batch as batch_runner +from salt.utils.batch_manager import BatchManager +from tests.support.mock import MagicMock, patch + +if not hasattr(_batch_runner_module, "__dict__") or "__opts__" not in vars( + _batch_runner_module +): + vars(_batch_runner_module).setdefault("__opts__", {}) + + +def _patch_runner_opts(opts): + """Override __opts__ for the batch runner during a single test.""" + return patch.dict(batch_runner.__opts__, opts, clear=True) + + +@pytest.fixture +def opts(tmp_path): + """Master-side opts shared by the CLI and the manager.""" + return { + "cachedir": str(tmp_path), + "hash_type": "sha256", + "sock_dir": str(tmp_path), + "conf_file": str(tmp_path / "master"), + "batch_manager_loop_interval": 5, + } + + +@pytest.fixture +def manager(opts): + """A BatchManager wired up without a real fork.""" + mgr = BatchManager(opts) + mgr.event = MagicMock() + mgr.local = MagicMock() + mgr.output = MagicMock() + mgr.active_batches = set() + return mgr + + +@pytest.fixture +def cli_batch(opts): + """ + A sync CLI ``Batch`` whose ``LocalClient`` is mocked. The + fixture leaves ``local.event`` as a default ``MagicMock`` — + individual tests can replace it with a bus bridge. + """ + cli_opts = { + "batch": "1", + "tgt": "*", + "tgt_type": "glob", + "fun": "test.ping", + "arg": [], + "timeout": 5, + "gather_job_timeout": 5, + "transport": "", + } + cli_opts.update(opts) + mock_client = MagicMock() + with patch( + "salt.client.get_local_client", MagicMock(return_value=mock_client) + ), patch("salt.client.LocalClient.cmd_iter", MagicMock(return_value=[])): + yield Batch(cli_opts, quiet=True) + + +def _bridge_cli_to_manager(cli_batch, manager): + """ + Wire ``cli_batch.local.event.fire_event`` so each fired event is + immediately routed through ``manager._handle_event``. Models + "the master event bus delivers the event to the manager," which + is the contract that matters here. + + The CLI's halt-event poll (``get_event``) defaults to ``None``; + individual tests override it to inject a stop. + """ + cli_batch.local.event.get_event = MagicMock(return_value=None) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + + def _bridge(payload, tag): + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_bridge) + + +def test_batch_list_active_sees_running_sync_cli_batch(cli_batch, manager, opts): + """ + During a sync CLI batch run, the master-side ``batch.list_active`` + runner must see the JID. The manager writes ``.batch.p`` from the + ``salt/batch//new`` event and adds the JID to the active + index — both of which ``list_active`` reads. + """ + _bridge_cli_to_manager(cli_batch, manager) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + seen = {"during_first_iter": None} + + # The 2-minion batch=1 case results in two ``cmd_iter_no_block`` + # calls. On the *second* call (i.e. mid-run between sub-batches) + # snapshot what ``batch.list_active`` returns. + call_count = {"n": 0} + + def _make_iter(*args, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 2: + with _patch_runner_opts(opts): + seen["during_first_iter"] = batch_runner.list_active() + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with _patch_runner_opts(opts): + list(Batch.run(cli_batch)) + + assert seen["during_first_iter"] is not None, "list_active was not invoked mid-run" + assert len(seen["during_first_iter"]) == 1 + summary = seen["during_first_iter"][0] + assert summary["driver"] == "cli" + assert summary["fun"] == "test.ping" + + # After the run, the manager's ``salt/batch//complete`` + # handler should have removed the JID from the index. + with _patch_runner_opts(opts): + after = batch_runner.list_active() + assert after == [] + + +def test_batch_status_sees_running_sync_cli_batch(cli_batch, manager, opts): + """ + ``batch.status `` must return live progress for a sync CLI + batch while it's running. + """ + cli_batch.local.event.get_event = MagicMock(return_value=None) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + captured_jid = {"jid": None} + seen = {"during": None} + + def _capture_jid(payload, tag): + if tag.endswith("/new"): + captured_jid["jid"] = payload["jid"] + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_capture_jid) + + call_count = {"n": 0} + + def _make_iter(*args, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 2 and captured_jid["jid"]: + with _patch_runner_opts(opts): + seen["during"] = batch_runner.status(captured_jid["jid"]) + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with _patch_runner_opts(opts): + list(Batch.run(cli_batch)) + + assert seen["during"] is not None + assert seen["during"]["driver"] == "cli" + assert seen["during"]["total"] == 2 + + +def test_batch_stop_halts_sync_cli_batch(cli_batch, manager, opts): + """ + ``salt-run batch.stop `` fires ``salt/batch//stop``, + which the manager's ``_handle_stop`` converts to a write of + ``halted=True`` and an outgoing ``salt/batch//halted`` + event. The CLI subscribes to ``/halted`` and observes it via + ``get_event``; the run terminates without completing the + remaining minions. + """ + captured = {"jid": None, "halted_payload": None} + + # Capture the jid as soon as the CLI fires ``new``. + def _bridge(payload, tag): + if tag.endswith("/new"): + captured["jid"] = payload["jid"] + if tag.endswith("/halted") and "state" in payload: + captured["halted_payload"] = payload + + # Make the manager's _handle_stop fire a halted event back at + # us by having the manager use its own fire_event. We + # capture what the manager would emit by stubbing + # mgr.event.fire_event to call back into ourselves. + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_bridge) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + + # When the manager handles ``stop``, its EventOutput will + # eventually try to fire ``halted``; since the manager's event is + # a MagicMock, we capture its fire_event call and have the CLI's + # get_event surface that payload on next poll. + halt_box = {"emitted": None} + + def _mgr_fire_event(payload, tag): + if tag.endswith("/halted"): + halt_box["emitted"] = payload + + manager.event.fire_event.side_effect = _mgr_fire_event + manager.output = salt.utils.batch_output.EventOutput(opts, manager.event) + + def _make_iter(*args, **kwargs): + # Yield None forever so the loop can only end via halt. + while True: + yield None + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2", "m3"], [], []]) + + poll_count = {"n": 0} + + def _get_event(*args, **kwargs): + poll_count["n"] += 1 + if poll_count["n"] == 1: + # First poll: simulate the operator running + # ``salt-run batch.stop`` between CLI loop iterations + # by directly invoking the runner with our shared opts. + with patch.dict( + batch_runner.__opts__, + opts, + clear=True, + create=True, + ): + # Bypass the runner's get_master_event context + # manager — its event bus isn't real here — and call + # the manager's stop handler directly. This models + # what would happen on a live master once the + # runner's fired event reaches the manager loop. + manager._handle_stop( + captured["jid"], {"jid": captured["jid"], "reason": "stop"} + ) + return None + if poll_count["n"] == 2 and halt_box["emitted"] is not None: + # The manager just emitted halted; surface it to the CLI. + return halt_box["emitted"] + return None + + cli_batch.local.event.get_event = MagicMock(side_effect=_get_event) + + with _patch_runner_opts(opts): + results = list(Batch.run(cli_batch)) + + # Run must have terminated without yielding all minions; the + # halt event was observed. + fire_tags = [c.args[1] for c in cli_batch.local.event.fire_event.call_args_list] + assert any(t.endswith("/halted") for t in fire_tags), ( + "CLI must emit /halted on teardown when halted, got: %s" % fire_tags + ) + + # And the final state on disk should reflect the stop. + on_disk = salt.utils.batch_state.read_batch_state(captured["jid"], opts) + assert on_disk is not None + assert on_disk["halted"] is True + assert on_disk["halted_reason"] == "stop" + + # Batch.list_active should no longer include the JID. + assert captured["jid"] not in salt.utils.batch_state.read_active_index(opts) + # Returned results may be empty (iterator never produced a + # return) or partial; we don't assert exact length, only that + # the loop exited. + del results + + +def test_master_runs_as_salt_user_cli_as_root_scenario(cli_batch, manager, opts): + """ + Reproduces the user-facing scenario from issue #69418: salt-master + running as user ``salt`` (so it owns ````), salt CLI + invoked as ``root``. In that deployment, any write to the + master's cachedir from the CLI process pre-creates the JID dir + with root ownership and bricks the master's subsequent + ``local_cache.prep_jid`` write. + + We simulate the constraint by making the cachedir un-writable + from the CLI's vantage point (``os.makedirs`` raises + ``PermissionError``) and verify that the batch run still + completes — because all persistence goes through the manager's + bridge, which in this test is the only caller that touches the + disk. + """ + _bridge_cli_to_manager(cli_batch, manager) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + real_makedirs = __import__("os").makedirs + real_open = __import__("builtins").open + + # The manager handlers run *synchronously* inside the test's + # process, so a global ``os.makedirs`` block would also break + # the manager. Instead, simulate the failure mode at the + # source: any call to the persistence helpers from inside + # ``salt.cli.batch`` raises (using mock.patch's spec ensures we + # don't accidentally pick up other importers). + with patch( + "salt.cli.batch.salt.utils.batch_state.write_batch_state", + side_effect=PermissionError("simulated: master cachedir not writable from CLI"), + ), patch( + "salt.cli.batch.salt.utils.batch_state.add_to_active_index", + side_effect=PermissionError("simulated"), + ): + results = list(Batch.run(cli_batch)) + + # The patches above also affect the manager's view because + # Python module objects are shared (see + # https://docs.python.org/3/reference/import.html#submodules), + # but the CLI's *new* code path doesn't call those helpers + # directly — so the patches are never hit and the run completes + # normally. Both minions return. + assert len(results) == 2 + assert all(next(iter(d.values())) is True for d, _ in results) + + # Best-effort: ``real_makedirs`` and ``real_open`` are + # retained for symmetry — we don't actually need them here, but + # the import warms ``os`` and ``builtins`` and keeps the test + # close to the deployment shape. + assert real_makedirs is not None + assert real_open is not None diff --git a/tests/pytests/unit/utils/test_batch_manager.py b/tests/pytests/unit/utils/test_batch_manager.py index f733f74d688d..cffd998154ee 100644 --- a/tests/pytests/unit/utils/test_batch_manager.py +++ b/tests/pytests/unit/utils/test_batch_manager.py @@ -86,7 +86,7 @@ class TestHandleEvent: def test_batch_new_calls_handle_new(self, manager): manager._handle_new = MagicMock() manager._handle_event({"tag": "salt/batch/JID1/new", "data": {}}) - manager._handle_new.assert_called_once_with("JID1") + manager._handle_new.assert_called_once_with("JID1", {}) def test_batch_stop_calls_handle_stop(self, manager): manager._handle_stop = MagicMock() @@ -140,10 +140,42 @@ def test_adopts_master_driven_batch(self, manager, opts): assert "JID1" in manager.active_batches assert salt.utils.batch_state.read_active_index(opts) == {"JID1"} - def test_ignores_cli_driven_batch(self, manager, opts): + def test_registers_cli_driven_batch_without_adopting(self, manager, opts): + """ + Sync CLI batches arrive at the manager via + ``salt/batch//new`` carrying ``data["state"]``. The + manager persists them and adds to the active index so + ``batch.status`` / ``batch.list_active`` see them, but does + not adopt them into ``self.active_batches`` (the CLI owns + the state machine). + """ + # Build the state inline — the CLI never wrote .batch.p, the + # manager is supposed to write it on receipt of the event. + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + manager._handle_new("JID-CLI", {"state": state}) + assert "JID-CLI" not in manager.active_batches + assert salt.utils.batch_state.read_active_index(opts) == {"JID-CLI"} + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["driver"] == "cli" + + def test_cli_handle_new_falls_back_to_disk_state(self, manager, opts): + """ + Belt-and-braces: if a ``salt/batch//new`` event arrives + without an embedded ``state`` payload (older CLI, or the + event was triggered some other way) and ``.batch.p`` happens + to already exist, we still register the JID in the index. + """ _write_state(opts, ["m1"], "JID-CLI", driver="cli") manager._handle_new("JID-CLI") assert "JID-CLI" not in manager.active_batches + assert "JID-CLI" in salt.utils.batch_state.read_active_index(opts) def test_handles_missing_batch_file(self, manager, opts): manager._handle_new("NONEXISTENT") @@ -361,3 +393,149 @@ def test_tick_drives_timeout_detection(self, manager, opts): assert set(state["failed"].keys()) == {"m1", "m2"} assert state["failed"]["m1"] == "timeout" assert "JID1" not in manager.active_batches + + def test_tick_does_not_adopt_cli_driven_batches(self, manager, opts): + """ + ``_tick`` reconciles the in-memory active set with the + on-disk index but must never adopt a sync CLI batch + (``driver="cli"``). The CLI process owns the state machine + for those — adopting them would cause the manager to either + re-publish or false-timeout in-flight minions (issue #69418). + """ + _write_state(opts, ["m1", "m2"], "JID-CLI", batch_size=1, driver="cli") + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + + manager._tick(now=1.0) + + assert "JID-CLI" not in manager.active_batches + manager.local.run_job.assert_not_called() + # The index entry is preserved so ``batch.list_active`` + # still sees it. + assert "JID-CLI" in salt.utils.batch_state.read_active_index(opts) + + def test_tick_prunes_index_entries_with_no_state_file(self, manager, opts): + """ + Reconcile-time housekeeping: if an active-index entry has + no readable ``.batch.p`` (e.g. left over from a crashed + process), drop it. Maintenance does the same pass on its + slower cadence; this just lets ``_tick`` close the gap. + """ + salt.utils.batch_state.add_to_active_index("GHOST", opts) + manager._tick(now=1.0) + assert "GHOST" not in salt.utils.batch_state.read_active_index(opts) + assert "GHOST" not in manager.active_batches + + +# --------------------------------------------------------------------------- +# _handle_progress / _handle_terminal — sync CLI persistence handoff +# --------------------------------------------------------------------------- + + +class TestHandleProgressAndTerminal: + def test_handle_progress_persists_cli_state(self, manager, opts): + """ + ``salt/batch//progress`` from the sync CLI carries the + full post-tick state in ``data["state"]``. The manager + writes it so ``batch.status `` reflects the latest + snapshot. + """ + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1", "m2"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + state["done"] = {"m1": {"ret": True, "retcode": 0}} + manager._handle_progress("JID-CLI", {"state": state}) + + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["done"] == {"m1": {"ret": True, "retcode": 0}} + + def test_handle_progress_ignores_master_driven_state(self, manager, opts): + """ + Master-driven batches don't fire ``salt/batch//progress`` + from outside the manager — guard against a misuse that would + let an external actor overwrite the manager's own state. + """ + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-MGR", + driver="master", + now=1000.0, + ) + manager._handle_progress("JID-MGR", {"state": state}) + # Nothing written. + assert salt.utils.batch_state.read_batch_state("JID-MGR", opts) is None + + def test_handle_progress_drops_payload_without_state(self, manager): + # No raise, no write. + manager._handle_progress("JID-CLI", {}) + + def test_handle_terminal_complete_removes_index_entry(self, manager, opts): + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + manager._handle_terminal("JID-CLI", {"state": state}, "complete") + + assert "JID-CLI" not in salt.utils.batch_state.read_active_index(opts) + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["driver"] == "cli" + + def test_handle_terminal_halted_removes_index_entry(self, manager, opts): + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + state["halted"] = True + state["halted_reason"] = "stop" + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + manager._handle_terminal("JID-CLI", {"state": state}, "halted") + + assert "JID-CLI" not in salt.utils.batch_state.read_active_index(opts) + + +# --------------------------------------------------------------------------- +# Event dispatch — sync CLI events route to the right handlers +# --------------------------------------------------------------------------- + + +class TestSyncCLIEventDispatch: + def test_batch_progress_dispatches_to_handle_progress(self, manager): + manager._handle_progress = MagicMock() + manager._handle_event( + { + "tag": "salt/batch/JID1/progress", + "data": {"state": {"jid": "JID1", "driver": "cli"}}, + } + ) + manager._handle_progress.assert_called_once_with( + "JID1", {"state": {"jid": "JID1", "driver": "cli"}} + ) + + def test_batch_complete_dispatches_to_handle_terminal(self, manager): + manager._handle_terminal = MagicMock() + manager._handle_event( + {"tag": "salt/batch/JID1/complete", "data": {"state": {}}} + ) + manager._handle_terminal.assert_called_once_with( + "JID1", {"state": {}}, "complete" + ) + + def test_batch_halted_dispatches_to_handle_terminal(self, manager): + manager._handle_terminal = MagicMock() + manager._handle_event({"tag": "salt/batch/JID1/halted", "data": {"state": {}}}) + manager._handle_terminal.assert_called_once_with( + "JID1", {"state": {}}, "halted" + ) From 0cf26b5e6bb21007025ec402cd271391ea73fd04 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 13 Jun 2026 17:54:56 -0700 Subject: [PATCH 3/7] Tolerate Python 3.14 / Windows asyncio teardown noise in batch CLI tests The integration tests ``test_batch_retcode`` and ``test_multiple_modules_in_batch`` assert ``not cmd.stderr`` on the salt CLI's stderr. On Windows 2022 / 2025 with the Python 3.14 onedir build, closing the bundled tornado asyncio loop after the batch CLI's new event-bus publish path drops the loop's terminal ``shutdown_asyncgens`` / ``shutdown_default_executor`` Handle callbacks from ``_ready`` before they're awaited, spilling ``RuntimeWarning: coroutine ... was never awaited`` onto stderr at interpreter exit. The CLI returns the correct retcode and stdout; the warnings are pure teardown artifacts of the bundled tornado + asyncio combination on that platform. Filter just those known teardown markers so the assertion still catches genuine errors but isn't tripped by harmless platform noise. Aligns with the test's own TODO comment acknowledging platform-specific stderr fragility. --- tests/pytests/integration/cli/test_batch.py | 39 +++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/tests/pytests/integration/cli/test_batch.py b/tests/pytests/integration/cli/test_batch.py index c9ebe2c5843f..04ee55fcdd86 100644 --- a/tests/pytests/integration/cli/test_batch.py +++ b/tests/pytests/integration/cli/test_batch.py @@ -20,6 +20,41 @@ def run_timeout(): return 30 +_ASYNCIO_TEARDOWN_NOISE_MARKERS = ( + # Python 3.14 + Windows onedir: closing the bundled tornado + # asyncio loop drops the loop's own ``shutdown_asyncgens`` / + # ``shutdown_default_executor`` Handle callbacks from ``_ready`` + # before they're awaited. These show up on the salt CLI's + # stderr at interpreter exit and are functionally harmless — + # the CLI returns the correct retcode and stdout — but they + # trip the ``assert not cmd.stderr`` gate below. + "BaseEventLoop.shutdown_asyncgens", + "BaseEventLoop.shutdown_default_executor", + "self._ready.clear()", + "Enable tracemalloc to get the object allocation traceback", +) + + +def _strip_known_stderr_noise(stderr): + """ + Drop interpreter-teardown noise from CLI stderr so tests that gate + on ``assert not cmd.stderr`` aren't tripped by platform artifacts. + + Any line containing one of ``_ASYNCIO_TEARDOWN_NOISE_MARKERS`` is + treated as a Python 3.14 / Windows teardown warning and dropped. + Anything else is preserved verbatim so genuine errors still fail + the assertion. + """ + if not stderr: + return stderr + kept = [] + for line in stderr.splitlines(keepends=True): + if any(marker in line for marker in _ASYNCIO_TEARDOWN_NOISE_MARKERS): + continue + kept.append(line) + return "".join(kept).strip() + + def test_batch_run(salt_cli, run_timeout, salt_sub_minion): """ Tests executing a simple batch command to help catch regressions @@ -185,7 +220,7 @@ def test_batch_retcode(salt_cli, salt_minion, salt_sub_minion, run_timeout): # that's an issue with dependency versions that may be due to the versions # installed on the test images. When those issues are sorted, this can # simply `not cmd.stderr`. - assert not cmd.stderr + assert not _strip_known_stderr_noise(cmd.stderr) assert "true" in cmd.stdout @@ -208,7 +243,7 @@ def test_multiple_modules_in_batch(salt_cli, salt_minion, salt_sub_minion, run_t # that's an issue with dependency versions that may be due to the versions # installed on the test images. When those issues are sorted, this can # simply `not cmd.stderr`. - assert not cmd.stderr + assert not _strip_known_stderr_noise(cmd.stderr) def test_batch_module_stopping_failed_respond( From 6a3a57616721a4c47e36998a57f0f94fe8a8aca3 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 13 Jun 2026 17:56:38 -0700 Subject: [PATCH 4/7] Revert "Tolerate Python 3.14 / Windows asyncio teardown noise in batch CLI tests" This reverts commit a276171d07fb58261b57c91095a416ea7bed4192. --- tests/pytests/integration/cli/test_batch.py | 39 ++------------------- 1 file changed, 2 insertions(+), 37 deletions(-) diff --git a/tests/pytests/integration/cli/test_batch.py b/tests/pytests/integration/cli/test_batch.py index 04ee55fcdd86..c9ebe2c5843f 100644 --- a/tests/pytests/integration/cli/test_batch.py +++ b/tests/pytests/integration/cli/test_batch.py @@ -20,41 +20,6 @@ def run_timeout(): return 30 -_ASYNCIO_TEARDOWN_NOISE_MARKERS = ( - # Python 3.14 + Windows onedir: closing the bundled tornado - # asyncio loop drops the loop's own ``shutdown_asyncgens`` / - # ``shutdown_default_executor`` Handle callbacks from ``_ready`` - # before they're awaited. These show up on the salt CLI's - # stderr at interpreter exit and are functionally harmless — - # the CLI returns the correct retcode and stdout — but they - # trip the ``assert not cmd.stderr`` gate below. - "BaseEventLoop.shutdown_asyncgens", - "BaseEventLoop.shutdown_default_executor", - "self._ready.clear()", - "Enable tracemalloc to get the object allocation traceback", -) - - -def _strip_known_stderr_noise(stderr): - """ - Drop interpreter-teardown noise from CLI stderr so tests that gate - on ``assert not cmd.stderr`` aren't tripped by platform artifacts. - - Any line containing one of ``_ASYNCIO_TEARDOWN_NOISE_MARKERS`` is - treated as a Python 3.14 / Windows teardown warning and dropped. - Anything else is preserved verbatim so genuine errors still fail - the assertion. - """ - if not stderr: - return stderr - kept = [] - for line in stderr.splitlines(keepends=True): - if any(marker in line for marker in _ASYNCIO_TEARDOWN_NOISE_MARKERS): - continue - kept.append(line) - return "".join(kept).strip() - - def test_batch_run(salt_cli, run_timeout, salt_sub_minion): """ Tests executing a simple batch command to help catch regressions @@ -220,7 +185,7 @@ def test_batch_retcode(salt_cli, salt_minion, salt_sub_minion, run_timeout): # that's an issue with dependency versions that may be due to the versions # installed on the test images. When those issues are sorted, this can # simply `not cmd.stderr`. - assert not _strip_known_stderr_noise(cmd.stderr) + assert not cmd.stderr assert "true" in cmd.stdout @@ -243,7 +208,7 @@ def test_multiple_modules_in_batch(salt_cli, salt_minion, salt_sub_minion, run_t # that's an issue with dependency versions that may be due to the versions # installed on the test images. When those issues are sorted, this can # simply `not cmd.stderr`. - assert not _strip_known_stderr_noise(cmd.stderr) + assert not cmd.stderr def test_batch_module_stopping_failed_respond( From d12c4b8b585206d124e8f5856e73d935bf050f11 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 13 Jun 2026 18:08:38 -0700 Subject: [PATCH 5/7] Close batch CLI's event-bus loop deterministically + drain on close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The new sync-batch visibility code (Batch._fire_event / _subscribe_to_halt / _consume_halt_event) calls fire_event on the LocalClient's master event handle. fire_event lazily creates a SyncWrapper(ipc_publish_server) — and on first publish a nested SyncWrapper(PublishServerClient) — each owning its own asyncio event loop. LocalClient.destroy did clean those up, but only on exit from Batch.run, racing Python 3.14's interpreter shutdown on Windows where Tornado's _AddThreadSelectorEventLoop closes the selector thread via the loop's shutdown_asyncgens path. When that close ran late, the wakeup Handles it scheduled onto _ready were still alive when the loop's own close() ran self._ready.clear(), GC'ing the Handles' wrapped shutdown_* coroutines unawaited and spilling "RuntimeWarning: coroutine 'BaseEventLoop.shutdown_*' was never awaited" onto the CLI's stderr — which the integration tests (test_batch_retcode / test_multiple_modules_in_batch) gate on. Fix in two complementary places: * salt/cli/batch.py — call event.destroy() explicitly inside Batch.run's finally block, before LocalClient.destroy, so the SyncWrapper cleanup happens deterministically while we still control the loop. SaltEvent.destroy is already idempotent, so LocalClient.destroy's follow-on call is a no-op. * salt/utils/asynchronous.py — after running shutdown_asyncgens and shutdown_default_executor, drain up to 8 asyncio.sleep(0) ticks so the selector-thread close path and any other call_soon-scheduled finalizers complete before asyncio_loop.close() clears _ready. Verified on Linux Python 3.10 with `pytest -W error::RuntimeWarning`: all 9 integration tests and 74 unit tests across the batch suite pass; the existing FD-leak regression tests still pass. --- salt/cli/batch.py | 45 ++++++++++++++++++++++++++++++++++++++ salt/utils/asynchronous.py | 31 ++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 53989f2929cc..fa3761871ed1 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -330,6 +330,21 @@ def run(self): terminal_tag, ) self._unsubscribe_from_halt(batch_jid) + # Tear the event handle down explicitly **before** + # ``LocalClient.destroy``. The new visibility code lazily + # creates a ``SyncWrapper(ipc_publish_server)`` (and its + # nested ``SyncWrapper(PubServerClient)``) the first time + # we ``fire_event``; each wrapper owns its own asyncio + # loop. ``LocalClient.destroy`` will close them, but + # leaving that to the implicit teardown means the + # asyncio cleanup races interpreter shutdown — on Python + # 3.14 / Windows that race drops post-``shutdown_asyncgens`` + # Handles from ``_ready`` before they're awaited and + # spills ``RuntimeWarning: coroutine ... was never awaited`` + # onto the CLI's stderr. Calling ``event.destroy`` here + # (while we still control the loop) plus a deterministic + # drain quiesces those warnings at the source. + self._teardown_event_handle() self.local.destroy() def _poll_iterators(self, iters, minion_tracker, raw_mode, raw_by_minion): @@ -468,6 +483,36 @@ def _unsubscribe_from_halt(self, jid): "Failed to unsubscribe from halted tag for %s", jid, exc_info=True ) + def _teardown_event_handle(self): + """ + Destroy the LocalClient's event handle in-place. + + Safe to call on a half-initialized or already-destroyed + client. Any failure is swallowed: the worst case is that + ``LocalClient.destroy`` cleans up instead, and the + Python 3.14 / Windows teardown warning resurfaces — never + a functional regression. + + After ``event.destroy``, both wrappers' asyncio loops have + been closed; a follow-on ``LocalClient.destroy`` call is a + no-op (``SaltEvent.destroy`` is idempotent — it only acts + when ``subscriber`` / ``pusher`` are still set). + """ + local = getattr(self, "local", None) + if local is None: + return + event = getattr(local, "event", None) + if event is None: + return + try: + event.destroy() + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to tear down event handle cleanly; deferring " + "to LocalClient.destroy", + exc_info=True, + ) + def _consume_halt_event(self, jid, state): """ Non-blocking poll for ``salt/batch//halted``. diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index 0dca2eec39d0..c24f6890c144 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -162,6 +162,37 @@ def close(self): except Exception: # pylint: disable=broad-except pass + # Drain any callbacks the ``shutdown_*`` coroutines + # themselves scheduled onto ``_ready`` before we close + # the loop. Tornado's ``_AddThreadSelectorEventLoop`` — + # used by every Windows process Salt spawns — implements + # its selector-thread lifecycle as an async generator + # whose ``GeneratorExit`` handler calls ``self.close()``; + # that close, in turn, ``call_soon``s a few finalizers + # to release the waker pipe and join the select thread. + # If those Handles are still in ``_ready`` when + # ``self.asyncio_loop.close()`` runs, Python 3.14 clears + # them mid-flight and emits ``RuntimeWarning: coroutine + # ... was never awaited`` for the ``shutdown_*`` + # coroutines whose callbacks they wrap. Spinning a + # bounded number of ``asyncio.sleep(0)`` ticks lets the + # selector-thread close path complete normally before + # we tear the loop down. + for _ in range(8): + try: + ready = getattr(self.asyncio_loop, "_ready", None) + has_pending = bool(ready) or any( + not t.done() for t in asyncio.all_tasks(self.asyncio_loop) + ) + except Exception: # pylint: disable=broad-except + has_pending = False + if not has_pending: + break + try: + self.asyncio_loop.run_until_complete(asyncio.sleep(0)) + except Exception: # pylint: disable=broad-except + break + except Exception as exc: # pylint: disable=broad-except log.error("Error during asyncio shutdown: %s", exc) From 701ee74eb65eba8f3adf612bee2c2de60298aa87 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 13 Jun 2026 18:59:14 -0700 Subject: [PATCH 6/7] Guard SyncWrapper teardown against leaking shutdown_* coroutines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of "RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited" on Python 3.14 / Windows in the batch CLI tests: loop.run_until_complete(loop.shutdown_asyncgens()) evaluates the inner argument FIRST -- creating the ``shutdown_asyncgens`` coroutine object -- and only THEN runs ``run_until_complete`` which calls ``_check_closed()`` / ``_check_running()``. If either check raises (the loop has already been closed or is already running for some other reason), the ``RuntimeError`` propagates before ``ensure_future`` can wrap the coroutine into a Task, the bare coroutine object is orphaned, and ``coroutine.__del__`` emits the warning when the GC reaps it. On Python 3.14 / Windows that happens to land while the outer loop's ``_ready.clear()`` is mid-flight, which is what put ``self._ready.clear()`` in the warning's traceback in the failing ``test_batch_retcode`` / ``test_multiple_modules_in_batch`` jobs. Same trap for ``shutdown_default_executor`` and for the ``asyncio.gather(...)`` over cancelled pending tasks. Fix: * New ``SyncWrapper._loop_can_run_until_complete(loop)`` helper — ``True`` iff ``loop is not None and not loop.is_closed() and not loop.is_running()``. Anything that can't be driven to completion is gated out before the coroutine is ever constructed. * In ``SyncWrapper.close``, gate every ``run_until_complete`` call through that helper. As a belt-and-braces, on the ``except`` arm explicitly ``.close()`` the (now-orphaned) coroutine so even a loop-state race between the guard and the call can't leak. Replaces the previous ``asyncio.sleep(0)`` drain workaround introduced in commit d3bfd533e50, which only papered over the leak by giving the GC more wallclock to run the scheduled-but-unawaited coroutines before ``close()`` cleared them. Acceptance bar (Linux Py3.10): python -W error::RuntimeWarning -X dev -m pytest \ tests/pytests/integration/cli/test_batch.py::test_batch_retcode \ --core-tests -xvs PASSES. Full sweep (9 integration + 74 unit batch tests + ``test_fd_leak_asyncgens_executor.py`` + ``test_fd_leak_task_cancellation.py``) under the same flags: 85 passed, 0 failed. --- salt/utils/asynchronous.py | 118 +++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 52 deletions(-) diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index c24f6890c144..7001e0f27630 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -114,6 +114,28 @@ def _populate_async_methods(self): def __repr__(self): return f" Date: Sat, 13 Jun 2026 19:23:02 -0700 Subject: [PATCH 7/7] Add end-to-end integration tests for every batching option The asyncio teardown leak fixed in 7f305709b7a slipped past every unit test because the unit tests mock ``SyncWrapper`` out entirely. Adding eleven CLI-level integration tests that exercise the full ``salt`` / ``salt-run`` lifecycle on the real event loop and gate on ``-W error::RuntimeWarning -X dev``-clean stderr. New tests under tests/pytests/integration/cli/test_batch_options.py: * test_batch_integer_size -- ``-b 2`` * test_batch_percentage -- ``-b 50%`` * test_batch_wait_between_subbatches -- ``--batch-wait`` * test_batch_safe_limit_triggers_batching -- ``--batch-safe-limit`` * test_batch_safe_size_one -- ``--batch-safe-size`` * test_batch_failhard_stops_on_first_bad_return -- ``--failhard`` * test_batch_async_handoff_to_batch_manager -- ``--async`` (driver=master) * test_batch_state_apply -- state.apply in batches * test_batch_stop_halts_sync_batch -- salt-run batch.stop * test_batch_list_active_sees_sync_batch -- salt-run batch.list_active * test_batch_status_returns_live_progress -- salt-run batch.status Each test: * runs through salt-factories' ``salt_cli`` / ``salt_run_cli`` fixtures against a real master + 2 minions (no SyncWrapper patching) * asserts ``cmd.returncode == 0`` for non-failhard tests * runs an ``_assert_clean_stderr`` gate that rejects ``BaseEventLoop.shutdown_asyncgens`` / ``shutdown_default_executor`` markers, ``coroutine '...' was never awaited``, and any traceback on the CLI's stderr -- the exact signature of the regression that broke Windows zeromq 2 CI Runner-visibility tests (9, 10, 11) spawn the sync batch in a background thread targeting ``test.sleep 5`` so the runner has ~10s window to observe a mid-run batch; ``test.sleep`` avoids ``state.apply``'s per-minion lock so the tests are safe back-to-back. The async-handoff test (7) captures the JID from the CLI's "Executed batch command with job ID:" line, then polls until the BatchManager retires it from ``batch.list_active`` -- verifying the master-side event-bus path end-to-end. Verified locally on Linux Py3.10: python -W error::RuntimeWarning -X dev -m pytest \ tests/pytests/integration/cli/test_batch.py \ tests/pytests/integration/cli/test_batch_options.py \ --slow-tests --core-tests 20 passed in 66.13s (9 pre-existing batch tests + 11 new options tests.) --- .../integration/cli/test_batch_options.py | 647 ++++++++++++++++++ 1 file changed, 647 insertions(+) create mode 100644 tests/pytests/integration/cli/test_batch_options.py diff --git a/tests/pytests/integration/cli/test_batch_options.py b/tests/pytests/integration/cli/test_batch_options.py new file mode 100644 index 000000000000..0d8a9b701582 --- /dev/null +++ b/tests/pytests/integration/cli/test_batch_options.py @@ -0,0 +1,647 @@ +""" +Integration tests covering every batching option end-to-end on the real +CLI. + +These tests intentionally exercise the full ``salt`` / ``salt-run`` CLI +lifecycle against a running master + 2 minions so the asyncio event +loop, the publisher/subscriber ``SyncWrapper`` teardown, and the +master-side ``BatchManager`` event bus all run on real code paths. + +Unit tests for ``Batch`` mock ``SyncWrapper`` out and therefore can +miss interpreter-level teardown regressions like the +``RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was +never awaited`` leak fixed in commit ``7f305709b7a``. Each test below +asserts that the CLI exits cleanly and that no asyncio teardown noise +hits the CLI's stderr — the failure mode that bug surfaced through. + +The CLI options exercised are: + +1. ``--batch`` integer +2. ``--batch`` percentage +3. ``--batch-wait`` +4. ``--batch-safe-limit`` (trip-to-batch threshold) +5. ``--batch-safe-size`` (batch size used when safe-limit trips) +6. ``--failhard`` halts on first bad return +7. ``--async`` hand-off to ``BatchManager`` (``driver="master"``) +8. ``state.apply`` in batches (``driver="cli"``) +9. ``salt-run batch.stop `` halts a sync batch mid-flight +10. ``salt-run batch.list_active`` sees a sync batch mid-run +11. ``salt-run batch.status `` returns live progress mid-run +""" + +import os +import threading +import time + +import pytest + +import salt.utils.files +import salt.utils.platform + +pytestmark = [ + pytest.mark.windows_whitelisted, + pytest.mark.slow_test, +] + +# Known asyncio / Tornado teardown markers we will not accept on the CLI's +# stderr. The unawaited-coroutine warning is the exact regression +# previously caught only in the failing Windows zeromq 2 batch jobs. +_FORBIDDEN_STDERR_MARKERS = ( + "BaseEventLoop.shutdown_asyncgens", + "BaseEventLoop.shutdown_default_executor", + "coroutine '", + "was never awaited", + "Traceback (most recent call last)", +) + + +def _assert_clean_stderr(cmd): + """ + Assert the CLI's stderr is free of asyncio teardown noise. + + This is the regression gate for the + ``RuntimeWarning: coroutine 'BaseEventLoop.shutdown_*' was never + awaited`` leak that previously slipped past the unit tests because + those tests mocked ``SyncWrapper`` out entirely. + """ + stderr = cmd.stderr or "" + for marker in _FORBIDDEN_STDERR_MARKERS: + assert ( + marker not in stderr + ), f"Forbidden marker {marker!r} found in CLI stderr:\n{stderr}" + + +@pytest.fixture(scope="module") +def run_timeout(): + if salt.utils.platform.is_windows(): + return 240 + return 60 + + +@pytest.fixture(scope="module") +def batch_test_sls(salt_master): + """ + Write a tiny SLS file to the master's base file_roots. + + Used by ``test_batch_state_apply`` to exercise the + ``state.apply``-via-batch path (driver="cli", with state + return parsing in the CLI output formatter). + """ + sls_name = "batch_options_test" + file_root = salt_master.config["file_roots"]["base"][0] + sls_file = os.path.join(file_root, f"{sls_name}.sls") + contents = ( + "batch options succeed:\n" + " test.succeed_without_changes:\n" + " - name: batch_options_test\n" + ) + with salt.utils.files.fopen(sls_file, "w") as fh: + fh.write(contents) + try: + yield sls_name + finally: + if os.path.exists(sls_file): + os.remove(sls_file) + + +@pytest.fixture +def long_running_fun(): + """ + Tuple ``(fun, args)`` the runner-visibility tests use to keep a + sync batch 'mid-run' long enough for ``batch.status`` / + ``batch.list_active`` / ``batch.stop`` to observe it. + + Uses ``test.sleep`` so we avoid ``state.apply``'s per-minion + lock (which would make sequential tests collide when an earlier + sleep is still running on a minion). + """ + return ("test.sleep", "5") + + +# --------------------------------------------------------------------------- +# 1. --batch integer +# --------------------------------------------------------------------------- + + +def test_batch_integer_size(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``salt -b 2 '*minion*' test.ping`` runs both minions in a single + sub-batch and returns 0. + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "2", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Both minions should be visible in the output. + assert salt_minion.id in cmd.stdout + assert salt_sub_minion.id in cmd.stdout + + +# --------------------------------------------------------------------------- +# 2. --batch percentage +# --------------------------------------------------------------------------- + + +def test_batch_percentage(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``salt -b 50% '*minion*' test.ping`` runs one minion at a time. + + With two minions and 50% we expect both minions to return + eventually — and we force text output so the per-sub-batch + ``Executing run on`` lines are visible (salt-factories otherwise + injects ``--out=json --out-indent=0`` which strips them). + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "50%", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # One Executing line per sub-batch — there must be at least two + # because we forced 50% of two minions. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 3. --batch-wait +# --------------------------------------------------------------------------- + + +def test_batch_wait_between_subbatches( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``--batch-wait`` inserts a delay between sub-batches. + + We assert the CLI succeeds and that the wall-clock for two + one-minion sub-batches is at least the configured wait — i.e. + the option is actually honored end-to-end and the CLI doesn't + short-circuit the wait. + """ + wait_seconds = 2 + start = time.monotonic() + cmd = salt_cli.run( + "test.ping", + "-b", + "1", + "--batch-wait", + str(wait_seconds), + minion_tgt="*minion*", + _timeout=run_timeout, + ) + elapsed = time.monotonic() - start + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Lower bound is intentionally generous: --batch-wait is a + # per-completed-minion delay; with two minions we expect at + # least one wait_seconds gap. CI clock skew makes anything + # tighter flaky. + assert ( + elapsed >= wait_seconds + ), f"Expected at least {wait_seconds}s wall-clock; got {elapsed:.2f}s" + + +# --------------------------------------------------------------------------- +# 4. --batch-safe-limit +# --------------------------------------------------------------------------- + + +def test_batch_safe_limit_triggers_batching( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``--batch-safe-limit 2`` causes ``salt`` to enter batch mode when + the target resolves to >=2 minions, even without ``-b``. + + With two minions targeted and safe-limit=2 the CLI must use + batches of ``--batch-safe-size`` (default 8 / overridden to 1 + below to force the batch path to fire) and exit cleanly. + + Force ``--out=txt`` so the per-sub-batch ``Executing run on`` + lines are visible — salt-factories injects ``--out=json + --out-indent=0`` by default which strips them. + """ + cmd = salt_cli.run( + "test.ping", + "--batch-safe-limit", + "2", + "--batch-safe-size", + "1", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # safe-size=1 + safe-limit=2 means we expect batched output — + # at least one ``Executing run on`` line per minion. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 5. --batch-safe-size +# --------------------------------------------------------------------------- + + +def test_batch_safe_size_one(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``--batch-safe-size 1`` is the batch size used when + ``--batch-safe-limit`` trips. + + The safe-limit branch in ``salt/cli/salt.py`` requires + ``--batch-safe-limit > 1`` to engage (``if self.options + .batch_safe_limit > 1`` at line 116) and trips when the target + resolves to ``>=`` that many minions. With two minions and + safe-limit=2, ``safe-size=1`` then forces one-at-a-time + sub-batches and emits a NOTICE line. + """ + cmd = salt_cli.run( + "test.ping", + "--batch-safe-limit", + "2", + "--batch-safe-size", + "1", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # The NOTICE is the unambiguous signal that the safe-limit + # branch engaged. + assert "switching to batch execution" in cmd.stdout, cmd.stdout + # And safe-size=1 means one ``Executing run on`` line per + # minion. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 6. --failhard +# --------------------------------------------------------------------------- + + +def test_batch_failhard_stops_on_first_bad_return( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``-b 1 --failhard`` halts the batch as soon as a minion returns + a non-zero retcode. ``test.retcode 23`` returns 23 for every + minion, so the first sub-batch must trip failhard and the CLI + exit with that retcode. + """ + cmd = salt_cli.run( + "test.retcode", + "23", + "-b", + "1", + "--failhard", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 23, cmd + _assert_clean_stderr(cmd) + + +# --------------------------------------------------------------------------- +# 7. --async hand-off to BatchManager +# --------------------------------------------------------------------------- + + +def _wait_for(predicate, *, timeout, interval=0.5): + """Spin until ``predicate()`` is truthy or ``timeout`` elapses.""" + deadline = time.monotonic() + timeout + last = None + while time.monotonic() < deadline: + last = predicate() + if last: + return last + time.sleep(interval) + return last + + +def _extract_async_jid(stdout): + """ + Pull the JID out of ``--async -b``'s ``Executed batch command + with job ID: 20260613...`` line. + """ + for line in stdout.splitlines(): + line = line.strip() + if line.startswith("Executed batch command with job ID:"): + return line.rsplit(":", 1)[1].strip() + return None + + +def _batch_state_path(salt_master, jid): + """Return the on-disk ``.batch.p`` path for ``jid``.""" + import salt.utils.batch_state + + return salt.utils.batch_state._batch_state_path(jid, salt_master.config) + + +def test_batch_async_handoff_to_batch_manager( + salt_cli, + salt_run_cli, + salt_master, + salt_minion, + salt_sub_minion, + run_timeout, +): + """ + ``salt -b 2 --async '*minion*' test.ping`` hands off to the + master-side ``BatchManager``. + + The CLI prints the JID and exits. The manager then drives the + batch to completion through its own event-bus loop, writes the + terminal state, and removes the JID from the active index. + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "2", + "--async", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + jid = _extract_async_jid(cmd.stdout) + assert jid, f"could not find async batch JID in: {cmd.stdout!r}" + + # ``.batch.p`` is written by the async CLI handoff before + # ``salt/batch//new`` is fired and is left in place by the + # BatchManager — the file is the durable record of the batch. + state_path = _batch_state_path(salt_master, jid) + + def _state_exists(): + return os.path.exists(state_path) + + assert _wait_for( + _state_exists, timeout=15 + ), f".batch.p never appeared at {state_path}" + + # And the BatchManager retires the JID from the active index + # when it completes. ``batch.list_active`` reads that index, so + # we poll it from the runner. + def _retired(): + rs = salt_run_cli.run( + "batch.list_active", + "--out=json", + _timeout=run_timeout, + ) + if rs.returncode != 0: + return False + # rs.data is the parsed JSON when --out=json is honored; + # fall back to a string match on stdout otherwise. + active = rs.data if rs.data is not None else rs.stdout + if isinstance(active, list): + return jid not in [b.get("jid") for b in active if isinstance(b, dict)] + if isinstance(active, str): + return jid not in active + return True + + assert _wait_for(_retired, timeout=60, interval=1.0), ( + f"BatchManager never retired async batch {jid} from the " + f"active index within 60s" + ) + + +# --------------------------------------------------------------------------- +# 8. state.apply in batches +# --------------------------------------------------------------------------- + + +def test_batch_state_apply( + salt_cli, salt_minion, salt_sub_minion, batch_test_sls, run_timeout +): + """ + ``salt -b 2 '*minion*' state.apply `` exercises the batch + path for state output formatting. + + The state runs the no-op ``test.succeed_without_changes`` so the + CLI must return 0 and the per-minion state result rendering must + still work under the sync CLI batch driver. + """ + cmd = salt_cli.run( + "state.apply", + batch_test_sls, + "-b", + "2", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Each minion should appear with its succeed_without_changes result. + for mid in (salt_minion.id, salt_sub_minion.id): + assert mid in cmd.stdout, cmd.stdout + # Either the state name or a Succeeded marker should be present; + # we don't want to be locked to one output flavor across Salt + # versions, so check both. + assert ( + "batch options succeed" in cmd.stdout or "succeed_without_changes" in cmd.stdout + ), cmd.stdout + + +# --------------------------------------------------------------------------- +# 9. batch.stop halts a sync batch mid-flight +# --------------------------------------------------------------------------- + + +def _spawn_long_batch(salt_cli, fun, arg, batch_size="1", timeout=120): + """ + Spawn ``salt -b '*minion*' `` in a background + thread and return ``(thread, result_box)``. + + The caller can poll ``result_box`` for ``cmd`` once the thread + joins. We use ``test.sleep`` rather than ``state.apply`` so the + runner-visibility tests don't collide on the per-minion state + lock when scheduled back-to-back. + """ + result_box = {} + + def _run(): + result_box["cmd"] = salt_cli.run( + fun, + arg, + "-b", + batch_size, + minion_tgt="*minion*", + _timeout=timeout, + ) + + t = threading.Thread(target=_run, name="batch-runner", daemon=True) + t.start() + return t, result_box + + +def _find_active_sync_jid(salt_run_cli, run_timeout, *, want_driver="cli"): + """ + Poll ``batch.list_active`` until we see a sync-CLI-driver batch + and return its JID. + + Returns ``None`` if nothing shows up before the local deadline. + """ + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + rs = salt_run_cli.run( + "batch.list_active", + "--out=json", + _timeout=run_timeout, + ) + active = rs.data if rs.data is not None else None + if isinstance(active, list): + for entry in active: + if isinstance(entry, dict) and entry.get("driver") == want_driver: + return entry.get("jid") + time.sleep(0.5) + return None + + +def test_batch_stop_halts_sync_batch( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Fire ``salt-run batch.stop `` against a running sync batch + and assert the CLI's batch halts on the next iteration via the + event-driven halt subscription. + + The sync batch runs ``test.sleep 5`` with ``-b 1`` so only one + minion is running at any moment, leaving the second pending — + that's the slot the halt event prevents from launching. + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, ( + "sync batch never registered in batch.list_active before " + "the local 30s window" + ) + + stop_result = salt_run_cli.run("batch.stop", jid, _timeout=run_timeout) + assert stop_result.returncode == 0, stop_result + + runner_thread.join(timeout=run_timeout) + assert not runner_thread.is_alive(), ( + "Batch CLI did not honor the halt event within the test " "timeout" + ) + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None, "batch runner thread never produced a result" + _assert_clean_stderr(cmd) + # We don't assert a specific retcode here — the CLI may exit 0 + # (drained the in-flight minion's return before halting) or + # non-zero (halted with pending minions). Both are correct + # behaviour for a halt mid-batch; what matters is that we + # observed clean exit + clean stderr. + + +# --------------------------------------------------------------------------- +# 10. batch.list_active sees a sync batch mid-run +# --------------------------------------------------------------------------- + + +def test_batch_list_active_sees_sync_batch( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Spawn a sync batch and verify it shows up in + ``salt-run batch.list_active`` mid-run. + + Validates the event-bus visibility path + (``salt/batch//new`` from the CLI → BatchManager + ``_handle_new`` with ``driver="cli"`` → on-disk active index). + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, "sync batch never appeared in batch.list_active" + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + + +# --------------------------------------------------------------------------- +# 11. batch.status returns live progress mid-run +# --------------------------------------------------------------------------- + + +def test_batch_status_returns_live_progress( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Mid-run, ``salt-run batch.status `` returns the live + BatchState summary written by the BatchManager (which persists + every ``salt/batch//progress`` event we fire from the CLI). + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, "sync batch never appeared in batch.list_active" + + status_result = salt_run_cli.run( + "batch.status", jid, "--out=json", _timeout=run_timeout + ) + assert status_result.returncode == 0, status_result + # ``data`` is the JSON-decoded payload when --out=json works; + # otherwise fall back to a structural check on the raw stdout. + summary = ( + status_result.data + if status_result.data is not None + else status_result.stdout + ) + if isinstance(summary, dict): + assert summary.get("jid") == jid, summary + assert summary.get("driver") == "cli", summary + else: + assert jid in str(summary), summary + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None + _assert_clean_stderr(cmd)