diff --git a/changelog/69418.fixed.md b/changelog/69418.fixed.md new file mode 100644 index 000000000000..6e84b197ae37 --- /dev/null +++ b/changelog/69418.fixed.md @@ -0,0 +1,3 @@ +Fixed `salt -b` (sync batch mode) failing with `SaltClientError: Some exception handling minion payload` when the salt-master runs as a non-root user (e.g. `salt`). The sync CLI batch driver had been writing batch-state persistence files (`.batch.p`, `batch_active.p`) under the master's `cachedir` from the CLI process — pre-creating the JID directory with root ownership and tripping a `PermissionError` in `local_cache.prep_jid` on the master. + +The sync CLI driver no longer writes anything under the master's `cachedir` itself. Instead it ships every state transition to the master-side `BatchManager` as `salt/batch//{new,progress,complete,halted}` events; the manager — already running as the master daemon's user — persists `.batch.p` and maintains the active-batch index on the CLI's behalf. `salt-run batch.status `, `salt-run batch.list_active`, and `salt-run batch.stop ` now work for sync batches in the same deployment shape (non-root master, root CLI) where the original feature was broken. Event-bus failures degrade gracefully: the batch still completes, just without visibility from the runner commands. diff --git a/salt/cli/batch.py b/salt/cli/batch.py index ff02bb59a754..fa3761871ed1 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -159,12 +159,28 @@ def run(self): self.opts, self.minions, batch_jid, driver="cli" ) - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True - ) - salt.utils.batch_state.add_to_active_index( - batch_jid, self.opts, best_effort=True + # The sync CLI driver does not write under the master's + # ``cachedir`` itself. ``cachedir`` is owned by the master + # daemon's user (typically ``salt``); the CLI is normally + # invoked as ``root``, so any direct write would pre-create + # the JID directory with the wrong ownership and trip a + # ``PermissionError`` in ``local_cache.prep_jid`` when the + # master tries to write the ``jid`` file (issue #69418). + # + # Instead, we ship every state change to the master-side + # ``BatchManager`` via ``salt/batch//{new,progress, + # complete,halted}`` events. The manager — already running + # as the master daemon's user — persists ``.batch.p`` and + # maintains the active-batch index on the CLI's behalf. + # ``batch.status`` / ``batch.list_active`` / ``batch.stop`` + # see sync batches because of that handoff. All event ops + # are best-effort: if the master event bus is unreachable + # the CLI batch still completes correctly with no visibility. + self._fire_event( + salt.utils.batch_output.state_payload(state), + salt.utils.batch_output.tag_new(batch_jid), ) + self._subscribe_to_halt(batch_jid) output = salt.utils.batch_output.CLIOutput(self.opts, quiet=self.quiet) for down_minion in self.down_minions: @@ -196,11 +212,34 @@ def run(self): ) self._discover_late_minions(state) + # Observe halt requests from the master-side + # ``batch.stop`` runner before deciding what to do + # this tick. The runner fires ``salt/batch// + # stop`` which the BatchManager translates into + # ``salt/batch//halted``; we subscribed to the + # halted tag during startup. + if self._consume_halt_event(batch_jid, state): + # progress_batch already short-circuits on a + # halted state, but we still want to fall + # through the existing failhard reporting path. + pass + now = time.time() action = salt.utils.batch_state.progress_batch( state, new_returns, now=now, timed_out=timed_out ) + if ( + action.publish + or action.finished_minions + or action.timed_out_minions + or state["halted"] + ): + self._fire_event( + salt.utils.batch_output.state_payload(state), + salt.utils.batch_output.tag_progress(batch_jid), + ) + if action.publish: output.on_batch_start(action.publish) args = [ @@ -259,10 +298,6 @@ def run(self): yield {minion_id: {}}, 0 output.on_minion_timeout(minion_id) - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True - ) - # Prune finished iterators; progress_batch already # cleared their minions from state["active"]. iters = [ @@ -285,12 +320,31 @@ def run(self): output.on_batch_done(state) finally: - salt.utils.batch_state.remove_from_active_index( - batch_jid, self.opts, best_effort=True + terminal_tag = ( + salt.utils.batch_output.tag_halted(batch_jid) + if state.get("halted") + else salt.utils.batch_output.tag_complete(batch_jid) ) - salt.utils.batch_state.write_batch_state( - batch_jid, state, self.opts, best_effort=True + self._fire_event( + salt.utils.batch_output.state_payload(state), + terminal_tag, ) + self._unsubscribe_from_halt(batch_jid) + # Tear the event handle down explicitly **before** + # ``LocalClient.destroy``. The new visibility code lazily + # creates a ``SyncWrapper(ipc_publish_server)`` (and its + # nested ``SyncWrapper(PubServerClient)``) the first time + # we ``fire_event``; each wrapper owns its own asyncio + # loop. ``LocalClient.destroy`` will close them, but + # leaving that to the implicit teardown means the + # asyncio cleanup races interpreter shutdown — on Python + # 3.14 / Windows that race drops post-``shutdown_asyncgens`` + # Handles from ``_ready`` before they're awaited and + # spills ``RuntimeWarning: coroutine ... was never awaited`` + # onto the CLI's stderr. Calling ``event.destroy`` here + # (while we still control the loop) plus a deterministic + # drain quiesces those warnings at the source. + self._teardown_event_handle() self.local.destroy() def _poll_iterators(self, iters, minion_tracker, raw_mode, raw_by_minion): @@ -375,3 +429,117 @@ def _discover_late_minions(self, state): state["pending"].append(minion_id) if minion_id not in self.minions: self.minions.append(minion_id) + + # ------------------------------------------------------------------ + # Event-bus glue — best-effort visibility for ``batch.status`` / + # ``batch.list_active`` / ``batch.stop``. Every method here + # swallows its own errors so a broken or absent event bus never + # blocks the run; the worst case is "no visibility into the + # batch from master-side runners," same as on 3007.x. + # ------------------------------------------------------------------ + + def _event(self): + """Return the master event handle attached to our LocalClient.""" + return getattr(self.local, "event", None) + + def _fire_event(self, payload, tag): + """Fire a batch lifecycle event; never raise.""" + event = self._event() + if event is None: + return + try: + event.fire_event(payload, tag) + except Exception: # pylint: disable=broad-except + log.debug("Failed to fire %s; continuing without it", tag, exc_info=True) + + def _subscribe_to_halt(self, jid): + """Subscribe to ``salt/batch//halted`` so we observe stops.""" + event = self._event() + if event is None: + return + try: + event.subscribe( + salt.utils.batch_output.tag_halted(jid), match_type="startswith" + ) + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to subscribe to halted tag for %s; " + "batch.stop will not be observable from this CLI", + jid, + exc_info=True, + ) + + def _unsubscribe_from_halt(self, jid): + """Counterpart to ``_subscribe_to_halt``.""" + event = self._event() + if event is None: + return + try: + event.unsubscribe( + salt.utils.batch_output.tag_halted(jid), match_type="startswith" + ) + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to unsubscribe from halted tag for %s", jid, exc_info=True + ) + + def _teardown_event_handle(self): + """ + Destroy the LocalClient's event handle in-place. + + Safe to call on a half-initialized or already-destroyed + client. Any failure is swallowed: the worst case is that + ``LocalClient.destroy`` cleans up instead, and the + Python 3.14 / Windows teardown warning resurfaces — never + a functional regression. + + After ``event.destroy``, both wrappers' asyncio loops have + been closed; a follow-on ``LocalClient.destroy`` call is a + no-op (``SaltEvent.destroy`` is idempotent — it only acts + when ``subscriber`` / ``pusher`` are still set). + """ + local = getattr(self, "local", None) + if local is None: + return + event = getattr(local, "event", None) + if event is None: + return + try: + event.destroy() + except Exception: # pylint: disable=broad-except + log.debug( + "Failed to tear down event handle cleanly; deferring " + "to LocalClient.destroy", + exc_info=True, + ) + + def _consume_halt_event(self, jid, state): + """ + Non-blocking poll for ``salt/batch//halted``. + + Returns ``True`` when a halt event was observed (and mutates + *state* in place to record it); ``False`` otherwise. Spurious + bus failures degrade silently to ``False``. + """ + event = self._event() + if event is None: + return False + try: + payload = event.get_event( + wait=0, + tag=salt.utils.batch_output.tag_halted(jid), + match_type="startswith", + no_block=True, + ) + except Exception: # pylint: disable=broad-except + log.debug("Failed to poll halted tag for %s", jid, exc_info=True) + return False + if not isinstance(payload, dict): + return False + if payload.get("jid") != jid: + # Stray event (or a mock returning truthy garbage in + # tests). Ignore — only an explicit match counts. + return False + state["halted"] = True + state["halted_reason"] = payload.get("reason") or "stop" + return True diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index 0dca2eec39d0..7001e0f27630 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -114,6 +114,28 @@ def _populate_async_methods(self): def __repr__(self): return f"/new``, - so this handler reads the on-disk state to confirm and registers - the JID in the in-memory active set. Sync batches - (``driver="cli"``) are ignored. + Register a new batch. + + Two paths: + + * **Async** (``driver="master"``) — the async CLI path writes + ``.batch.p`` itself (it runs ``run_job`` first, so the + master has already created the JID directory with the right + ownership) and then fires ``salt/batch//new`` with no + ``state`` field. This handler reads from disk to confirm + and adopts the JID into the in-memory active set. + * **Sync CLI** (``driver="cli"``) — the sync CLI cannot + safely write under the master's ``cachedir`` (see issue + #69418), so it ships the full state in the event data + under ``data["state"]``. This handler persists that state + to ``.batch.p``, registers the JID in the active index, + but does **not** add it to ``self.active_batches`` — the + CLI process owns the iterator and driving the state + machine. The manager only acts as a visibility layer for + the ``batch.status`` / ``batch.list_active`` runners and as + the recipient of ``batch.stop`` requests, which it + translates into ``salt/batch//halted`` events the CLI + observes. """ - if jid in self.active_batches: - return - state = salt.utils.batch_state.read_batch_state(jid, self.opts) + if data is None: + data = {} + state = data.get("state") + if state is not None: + # Trust the event payload over any stale on-disk state — + # the sync CLI ships its in-memory state with every + # lifecycle event. + state = dict(state) + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + else: + state = salt.utils.batch_state.read_batch_state(jid, self.opts) if state is None: log.warning("salt/batch/%s/new received but .batch.p is not readable", jid) return - if state.get("driver") != "master": + driver = state.get("driver") + if driver == "cli": + # Persistence-only adoption: keep the JID in the on-disk + # active index so ``batch.list_active`` sees it, but do + # not drive the state machine — the CLI is doing that. + salt.utils.batch_state.add_to_active_index(jid, self.opts) + log.info( + "Registered sync CLI batch %s on behalf of %s", + jid, + state.get("user"), + ) + return + if driver != "master": log.debug( "Ignoring salt/batch/%s/new — driver=%s is not master-driven", jid, - state.get("driver"), + driver, ) return + if jid in self.active_batches: + return self._adopt(jid) log.info("Adopted async batch %s on behalf of %s", jid, state.get("user")) @@ -240,6 +284,49 @@ def _handle_recover(self, jid): self._adopt(jid) self._progress_one(jid, {}) + def _handle_progress(self, jid, data): + """ + Sync CLI driver progress update. + + The sync CLI fires ``salt/batch//progress`` after every + ``progress_batch()`` step, embedding the post-tick state under + ``data["state"]``. The manager just persists it so + ``batch.status`` reflects the latest snapshot. + + Master-driven (``driver="master"``) progress is internal to + the manager and never arrives here. + """ + state = data.get("state") if isinstance(data, dict) else None + if state is None: + return + state = dict(state) + if state.get("driver") != "cli": + return + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + + def _handle_terminal(self, jid, data, action): + """ + Sync CLI driver teardown. + + Fired as ``salt/batch//complete`` (normal drain) or + ``salt/batch//halted`` (failhard / external stop). The + manager persists the final state and removes the JID from + the active index so ``batch.list_active`` stops listing it. + + The CLI itself fires the lifecycle event on the bus; we do + not re-emit it here (the manager would otherwise be + subscribed to its own emission, which would loop). + """ + del action # for symmetry with _handle_stop; future use + state = data.get("state") if isinstance(data, dict) else None + if state is None: + return + state = dict(state) + if state.get("driver") != "cli": + return + salt.utils.batch_state.write_batch_state(jid, state, self.opts) + salt.utils.batch_state.remove_from_active_index(jid, self.opts) + def _handle_batch_return(self, jid, minion_id, data): """ Process a minion return event for an active batch. @@ -258,6 +345,11 @@ def _progress_one(self, jid, new_returns, now=None): """ Read ``.batch.p``, advance the state machine, persist the result, publish any new sub-batch, and fire progress events. + + Sync CLI batches (``driver="cli"``) are never advanced here + — their state machine is owned by the CLI process. This + method short-circuits if it's called for one anyway (e.g. + defensive callers, or a leftover entry in the active index). """ state = salt.utils.batch_state.read_batch_state(jid, self.opts) if state is None: @@ -267,6 +359,8 @@ def _progress_one(self, jid, new_returns, now=None): ) self._retire(jid) return + if state.get("driver") == "cli": + return if state.get("halted"): self._retire(jid) return @@ -294,11 +388,15 @@ def _tick(self, now=None): Three things happen, in order: 1. Reconcile the in-memory active set with the on-disk index. - Batches registered by other processes (the CLI at - batch-creation time, the ``batch.stop`` runner, a previously - crashed manager) can be adopted without requiring an - event. This closes the race where a ``salt/batch//new`` - event is lost before we're listening. + Master-driven batches registered by other processes (the + async CLI at batch-creation time, the ``batch.stop`` + runner, a previously crashed manager) can be adopted + without requiring an event. This closes the race where a + ``salt/batch//new`` event is lost before we're + listening. Sync CLI batches (``driver="cli"``) are kept + in the on-disk index for visibility but are never added + to the in-memory active set — they're driven by the CLI + process, not by us. 2. Advance each active batch by one tick — drives timeout detection and ``batch_wait`` expiry when no return events are arriving. @@ -307,6 +405,15 @@ def _tick(self, now=None): """ on_disk = salt.utils.batch_state.read_active_index(self.opts) for jid in on_disk - self.active_batches: + state = salt.utils.batch_state.read_batch_state(jid, self.opts) + if state is None: + # No state file for an indexed JID. Drop the stale + # index entry; Maintenance would otherwise prune it + # eventually but the tick can do it now for free. + salt.utils.batch_state.remove_from_active_index(jid, self.opts) + continue + if state.get("driver") != "master": + continue log.info( "BatchManager adopting batch %s discovered via active index", jid, diff --git a/salt/utils/batch_output.py b/salt/utils/batch_output.py index 76cf23e2719b..5cba9af0a00e 100644 --- a/salt/utils/batch_output.py +++ b/salt/utils/batch_output.py @@ -32,6 +32,7 @@ _BATCH_COMPLETE = "salt/batch/{jid}/complete" _BATCH_HALTED = "salt/batch/{jid}/halted" _BATCH_RECOVER = "salt/batch/{jid}/recover" +_BATCH_STOP = "salt/batch/{jid}/stop" def tag_new(jid): @@ -59,6 +60,11 @@ def tag_recover(jid): return _BATCH_RECOVER.format(jid=jid) +def tag_stop(jid): + """Tag fired by the ``batch.stop`` runner to request a halt.""" + return _BATCH_STOP.format(jid=jid) + + # --------------------------------------------------------------------------- # Payload builders — keep event bodies consistent between drivers. # --------------------------------------------------------------------------- @@ -89,6 +95,28 @@ def new_payload(state): return payload +def state_payload(state): + """ + Payload that embeds the full ``BatchState`` dict under ``state``. + + Used by the sync CLI driver to ship its in-memory state to the + master-side ``BatchManager`` so the manager can persist + ``.batch.p`` on the CLI's behalf. The CLI process — typically + running as ``root`` while the master runs as ``salt`` — must not + write under the master's ``cachedir`` directly (see issue + #69418); shipping the state in the event keeps all FS writes on + the master daemon's side of the trust boundary. + """ + payload = _base_payload(state) + payload.update( + { + "driver": state.get("driver"), + "state": dict(state), + } + ) + return payload + + def progress_payload(state, iteration=None): """Payload for ``salt/batch//progress``.""" payload = _base_payload(state) diff --git a/tests/pytests/integration/cli/test_batch_options.py b/tests/pytests/integration/cli/test_batch_options.py new file mode 100644 index 000000000000..0d8a9b701582 --- /dev/null +++ b/tests/pytests/integration/cli/test_batch_options.py @@ -0,0 +1,647 @@ +""" +Integration tests covering every batching option end-to-end on the real +CLI. + +These tests intentionally exercise the full ``salt`` / ``salt-run`` CLI +lifecycle against a running master + 2 minions so the asyncio event +loop, the publisher/subscriber ``SyncWrapper`` teardown, and the +master-side ``BatchManager`` event bus all run on real code paths. + +Unit tests for ``Batch`` mock ``SyncWrapper`` out and therefore can +miss interpreter-level teardown regressions like the +``RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was +never awaited`` leak fixed in commit ``7f305709b7a``. Each test below +asserts that the CLI exits cleanly and that no asyncio teardown noise +hits the CLI's stderr — the failure mode that bug surfaced through. + +The CLI options exercised are: + +1. ``--batch`` integer +2. ``--batch`` percentage +3. ``--batch-wait`` +4. ``--batch-safe-limit`` (trip-to-batch threshold) +5. ``--batch-safe-size`` (batch size used when safe-limit trips) +6. ``--failhard`` halts on first bad return +7. ``--async`` hand-off to ``BatchManager`` (``driver="master"``) +8. ``state.apply`` in batches (``driver="cli"``) +9. ``salt-run batch.stop `` halts a sync batch mid-flight +10. ``salt-run batch.list_active`` sees a sync batch mid-run +11. ``salt-run batch.status `` returns live progress mid-run +""" + +import os +import threading +import time + +import pytest + +import salt.utils.files +import salt.utils.platform + +pytestmark = [ + pytest.mark.windows_whitelisted, + pytest.mark.slow_test, +] + +# Known asyncio / Tornado teardown markers we will not accept on the CLI's +# stderr. The unawaited-coroutine warning is the exact regression +# previously caught only in the failing Windows zeromq 2 batch jobs. +_FORBIDDEN_STDERR_MARKERS = ( + "BaseEventLoop.shutdown_asyncgens", + "BaseEventLoop.shutdown_default_executor", + "coroutine '", + "was never awaited", + "Traceback (most recent call last)", +) + + +def _assert_clean_stderr(cmd): + """ + Assert the CLI's stderr is free of asyncio teardown noise. + + This is the regression gate for the + ``RuntimeWarning: coroutine 'BaseEventLoop.shutdown_*' was never + awaited`` leak that previously slipped past the unit tests because + those tests mocked ``SyncWrapper`` out entirely. + """ + stderr = cmd.stderr or "" + for marker in _FORBIDDEN_STDERR_MARKERS: + assert ( + marker not in stderr + ), f"Forbidden marker {marker!r} found in CLI stderr:\n{stderr}" + + +@pytest.fixture(scope="module") +def run_timeout(): + if salt.utils.platform.is_windows(): + return 240 + return 60 + + +@pytest.fixture(scope="module") +def batch_test_sls(salt_master): + """ + Write a tiny SLS file to the master's base file_roots. + + Used by ``test_batch_state_apply`` to exercise the + ``state.apply``-via-batch path (driver="cli", with state + return parsing in the CLI output formatter). + """ + sls_name = "batch_options_test" + file_root = salt_master.config["file_roots"]["base"][0] + sls_file = os.path.join(file_root, f"{sls_name}.sls") + contents = ( + "batch options succeed:\n" + " test.succeed_without_changes:\n" + " - name: batch_options_test\n" + ) + with salt.utils.files.fopen(sls_file, "w") as fh: + fh.write(contents) + try: + yield sls_name + finally: + if os.path.exists(sls_file): + os.remove(sls_file) + + +@pytest.fixture +def long_running_fun(): + """ + Tuple ``(fun, args)`` the runner-visibility tests use to keep a + sync batch 'mid-run' long enough for ``batch.status`` / + ``batch.list_active`` / ``batch.stop`` to observe it. + + Uses ``test.sleep`` so we avoid ``state.apply``'s per-minion + lock (which would make sequential tests collide when an earlier + sleep is still running on a minion). + """ + return ("test.sleep", "5") + + +# --------------------------------------------------------------------------- +# 1. --batch integer +# --------------------------------------------------------------------------- + + +def test_batch_integer_size(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``salt -b 2 '*minion*' test.ping`` runs both minions in a single + sub-batch and returns 0. + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "2", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Both minions should be visible in the output. + assert salt_minion.id in cmd.stdout + assert salt_sub_minion.id in cmd.stdout + + +# --------------------------------------------------------------------------- +# 2. --batch percentage +# --------------------------------------------------------------------------- + + +def test_batch_percentage(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``salt -b 50% '*minion*' test.ping`` runs one minion at a time. + + With two minions and 50% we expect both minions to return + eventually — and we force text output so the per-sub-batch + ``Executing run on`` lines are visible (salt-factories otherwise + injects ``--out=json --out-indent=0`` which strips them). + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "50%", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # One Executing line per sub-batch — there must be at least two + # because we forced 50% of two minions. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 3. --batch-wait +# --------------------------------------------------------------------------- + + +def test_batch_wait_between_subbatches( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``--batch-wait`` inserts a delay between sub-batches. + + We assert the CLI succeeds and that the wall-clock for two + one-minion sub-batches is at least the configured wait — i.e. + the option is actually honored end-to-end and the CLI doesn't + short-circuit the wait. + """ + wait_seconds = 2 + start = time.monotonic() + cmd = salt_cli.run( + "test.ping", + "-b", + "1", + "--batch-wait", + str(wait_seconds), + minion_tgt="*minion*", + _timeout=run_timeout, + ) + elapsed = time.monotonic() - start + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Lower bound is intentionally generous: --batch-wait is a + # per-completed-minion delay; with two minions we expect at + # least one wait_seconds gap. CI clock skew makes anything + # tighter flaky. + assert ( + elapsed >= wait_seconds + ), f"Expected at least {wait_seconds}s wall-clock; got {elapsed:.2f}s" + + +# --------------------------------------------------------------------------- +# 4. --batch-safe-limit +# --------------------------------------------------------------------------- + + +def test_batch_safe_limit_triggers_batching( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``--batch-safe-limit 2`` causes ``salt`` to enter batch mode when + the target resolves to >=2 minions, even without ``-b``. + + With two minions targeted and safe-limit=2 the CLI must use + batches of ``--batch-safe-size`` (default 8 / overridden to 1 + below to force the batch path to fire) and exit cleanly. + + Force ``--out=txt`` so the per-sub-batch ``Executing run on`` + lines are visible — salt-factories injects ``--out=json + --out-indent=0`` by default which strips them. + """ + cmd = salt_cli.run( + "test.ping", + "--batch-safe-limit", + "2", + "--batch-safe-size", + "1", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # safe-size=1 + safe-limit=2 means we expect batched output — + # at least one ``Executing run on`` line per minion. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 5. --batch-safe-size +# --------------------------------------------------------------------------- + + +def test_batch_safe_size_one(salt_cli, salt_minion, salt_sub_minion, run_timeout): + """ + ``--batch-safe-size 1`` is the batch size used when + ``--batch-safe-limit`` trips. + + The safe-limit branch in ``salt/cli/salt.py`` requires + ``--batch-safe-limit > 1`` to engage (``if self.options + .batch_safe_limit > 1`` at line 116) and trips when the target + resolves to ``>=`` that many minions. With two minions and + safe-limit=2, ``safe-size=1`` then forces one-at-a-time + sub-batches and emits a NOTICE line. + """ + cmd = salt_cli.run( + "test.ping", + "--batch-safe-limit", + "2", + "--batch-safe-size", + "1", + "--out=txt", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # The NOTICE is the unambiguous signal that the safe-limit + # branch engaged. + assert "switching to batch execution" in cmd.stdout, cmd.stdout + # And safe-size=1 means one ``Executing run on`` line per + # minion. + executing_lines = [ + line for line in cmd.stdout.splitlines() if "Executing run on" in line + ] + assert len(executing_lines) >= 2, cmd.stdout + + +# --------------------------------------------------------------------------- +# 6. --failhard +# --------------------------------------------------------------------------- + + +def test_batch_failhard_stops_on_first_bad_return( + salt_cli, salt_minion, salt_sub_minion, run_timeout +): + """ + ``-b 1 --failhard`` halts the batch as soon as a minion returns + a non-zero retcode. ``test.retcode 23`` returns 23 for every + minion, so the first sub-batch must trip failhard and the CLI + exit with that retcode. + """ + cmd = salt_cli.run( + "test.retcode", + "23", + "-b", + "1", + "--failhard", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 23, cmd + _assert_clean_stderr(cmd) + + +# --------------------------------------------------------------------------- +# 7. --async hand-off to BatchManager +# --------------------------------------------------------------------------- + + +def _wait_for(predicate, *, timeout, interval=0.5): + """Spin until ``predicate()`` is truthy or ``timeout`` elapses.""" + deadline = time.monotonic() + timeout + last = None + while time.monotonic() < deadline: + last = predicate() + if last: + return last + time.sleep(interval) + return last + + +def _extract_async_jid(stdout): + """ + Pull the JID out of ``--async -b``'s ``Executed batch command + with job ID: 20260613...`` line. + """ + for line in stdout.splitlines(): + line = line.strip() + if line.startswith("Executed batch command with job ID:"): + return line.rsplit(":", 1)[1].strip() + return None + + +def _batch_state_path(salt_master, jid): + """Return the on-disk ``.batch.p`` path for ``jid``.""" + import salt.utils.batch_state + + return salt.utils.batch_state._batch_state_path(jid, salt_master.config) + + +def test_batch_async_handoff_to_batch_manager( + salt_cli, + salt_run_cli, + salt_master, + salt_minion, + salt_sub_minion, + run_timeout, +): + """ + ``salt -b 2 --async '*minion*' test.ping`` hands off to the + master-side ``BatchManager``. + + The CLI prints the JID and exits. The manager then drives the + batch to completion through its own event-bus loop, writes the + terminal state, and removes the JID from the active index. + """ + cmd = salt_cli.run( + "test.ping", + "-b", + "2", + "--async", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + jid = _extract_async_jid(cmd.stdout) + assert jid, f"could not find async batch JID in: {cmd.stdout!r}" + + # ``.batch.p`` is written by the async CLI handoff before + # ``salt/batch//new`` is fired and is left in place by the + # BatchManager — the file is the durable record of the batch. + state_path = _batch_state_path(salt_master, jid) + + def _state_exists(): + return os.path.exists(state_path) + + assert _wait_for( + _state_exists, timeout=15 + ), f".batch.p never appeared at {state_path}" + + # And the BatchManager retires the JID from the active index + # when it completes. ``batch.list_active`` reads that index, so + # we poll it from the runner. + def _retired(): + rs = salt_run_cli.run( + "batch.list_active", + "--out=json", + _timeout=run_timeout, + ) + if rs.returncode != 0: + return False + # rs.data is the parsed JSON when --out=json is honored; + # fall back to a string match on stdout otherwise. + active = rs.data if rs.data is not None else rs.stdout + if isinstance(active, list): + return jid not in [b.get("jid") for b in active if isinstance(b, dict)] + if isinstance(active, str): + return jid not in active + return True + + assert _wait_for(_retired, timeout=60, interval=1.0), ( + f"BatchManager never retired async batch {jid} from the " + f"active index within 60s" + ) + + +# --------------------------------------------------------------------------- +# 8. state.apply in batches +# --------------------------------------------------------------------------- + + +def test_batch_state_apply( + salt_cli, salt_minion, salt_sub_minion, batch_test_sls, run_timeout +): + """ + ``salt -b 2 '*minion*' state.apply `` exercises the batch + path for state output formatting. + + The state runs the no-op ``test.succeed_without_changes`` so the + CLI must return 0 and the per-minion state result rendering must + still work under the sync CLI batch driver. + """ + cmd = salt_cli.run( + "state.apply", + batch_test_sls, + "-b", + "2", + minion_tgt="*minion*", + _timeout=run_timeout, + ) + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + # Each minion should appear with its succeed_without_changes result. + for mid in (salt_minion.id, salt_sub_minion.id): + assert mid in cmd.stdout, cmd.stdout + # Either the state name or a Succeeded marker should be present; + # we don't want to be locked to one output flavor across Salt + # versions, so check both. + assert ( + "batch options succeed" in cmd.stdout or "succeed_without_changes" in cmd.stdout + ), cmd.stdout + + +# --------------------------------------------------------------------------- +# 9. batch.stop halts a sync batch mid-flight +# --------------------------------------------------------------------------- + + +def _spawn_long_batch(salt_cli, fun, arg, batch_size="1", timeout=120): + """ + Spawn ``salt -b '*minion*' `` in a background + thread and return ``(thread, result_box)``. + + The caller can poll ``result_box`` for ``cmd`` once the thread + joins. We use ``test.sleep`` rather than ``state.apply`` so the + runner-visibility tests don't collide on the per-minion state + lock when scheduled back-to-back. + """ + result_box = {} + + def _run(): + result_box["cmd"] = salt_cli.run( + fun, + arg, + "-b", + batch_size, + minion_tgt="*minion*", + _timeout=timeout, + ) + + t = threading.Thread(target=_run, name="batch-runner", daemon=True) + t.start() + return t, result_box + + +def _find_active_sync_jid(salt_run_cli, run_timeout, *, want_driver="cli"): + """ + Poll ``batch.list_active`` until we see a sync-CLI-driver batch + and return its JID. + + Returns ``None`` if nothing shows up before the local deadline. + """ + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: + rs = salt_run_cli.run( + "batch.list_active", + "--out=json", + _timeout=run_timeout, + ) + active = rs.data if rs.data is not None else None + if isinstance(active, list): + for entry in active: + if isinstance(entry, dict) and entry.get("driver") == want_driver: + return entry.get("jid") + time.sleep(0.5) + return None + + +def test_batch_stop_halts_sync_batch( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Fire ``salt-run batch.stop `` against a running sync batch + and assert the CLI's batch halts on the next iteration via the + event-driven halt subscription. + + The sync batch runs ``test.sleep 5`` with ``-b 1`` so only one + minion is running at any moment, leaving the second pending — + that's the slot the halt event prevents from launching. + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, ( + "sync batch never registered in batch.list_active before " + "the local 30s window" + ) + + stop_result = salt_run_cli.run("batch.stop", jid, _timeout=run_timeout) + assert stop_result.returncode == 0, stop_result + + runner_thread.join(timeout=run_timeout) + assert not runner_thread.is_alive(), ( + "Batch CLI did not honor the halt event within the test " "timeout" + ) + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None, "batch runner thread never produced a result" + _assert_clean_stderr(cmd) + # We don't assert a specific retcode here — the CLI may exit 0 + # (drained the in-flight minion's return before halting) or + # non-zero (halted with pending minions). Both are correct + # behaviour for a halt mid-batch; what matters is that we + # observed clean exit + clean stderr. + + +# --------------------------------------------------------------------------- +# 10. batch.list_active sees a sync batch mid-run +# --------------------------------------------------------------------------- + + +def test_batch_list_active_sees_sync_batch( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Spawn a sync batch and verify it shows up in + ``salt-run batch.list_active`` mid-run. + + Validates the event-bus visibility path + (``salt/batch//new`` from the CLI → BatchManager + ``_handle_new`` with ``driver="cli"`` → on-disk active index). + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, "sync batch never appeared in batch.list_active" + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None + assert cmd.returncode == 0, cmd + _assert_clean_stderr(cmd) + + +# --------------------------------------------------------------------------- +# 11. batch.status returns live progress mid-run +# --------------------------------------------------------------------------- + + +def test_batch_status_returns_live_progress( + salt_cli, + salt_run_cli, + salt_minion, + salt_sub_minion, + long_running_fun, + run_timeout, +): + """ + Mid-run, ``salt-run batch.status `` returns the live + BatchState summary written by the BatchManager (which persists + every ``salt/batch//progress`` event we fire from the CLI). + """ + runner_thread, result_box = _spawn_long_batch( + salt_cli, *long_running_fun, batch_size="1", timeout=run_timeout + ) + try: + jid = _find_active_sync_jid(salt_run_cli, run_timeout) + assert jid, "sync batch never appeared in batch.list_active" + + status_result = salt_run_cli.run( + "batch.status", jid, "--out=json", _timeout=run_timeout + ) + assert status_result.returncode == 0, status_result + # ``data`` is the JSON-decoded payload when --out=json works; + # otherwise fall back to a structural check on the raw stdout. + summary = ( + status_result.data + if status_result.data is not None + else status_result.stdout + ) + if isinstance(summary, dict): + assert summary.get("jid") == jid, summary + assert summary.get("driver") == "cli", summary + else: + assert jid in str(summary), summary + finally: + runner_thread.join(timeout=run_timeout) + + cmd = result_box.get("cmd") + assert cmd is not None + _assert_clean_stderr(cmd) diff --git a/tests/pytests/unit/cli/test_batch.py b/tests/pytests/unit/cli/test_batch.py index 1e90219e4236..a50af52dc4f8 100644 --- a/tests/pytests/unit/cli/test_batch.py +++ b/tests/pytests/unit/cli/test_batch.py @@ -545,3 +545,204 @@ def _fake_sleep(duration): # dispatch. Without batch_wait, the second dispatch would happen # immediately and no 0.02s sleeps would be issued. assert spin_sleeps[0] >= 1 + + +def test_run_does_not_write_to_master_cachedir(batch): + """ + Regression test for issue #69418. + + The sync CLI batch driver must not write batch-state persistence + files (``.batch.p``, ``batch_active.p``) under the master's + ``cachedir`` from the CLI process. The salt-master daemon is the + sole owner of that directory tree. When the CLI is invoked as + ``root`` against a master running as user ``salt``, any + CLI-initiated write (or directory creation) under the master's + cachedir produces root-owned files that the master cannot + subsequently update — which surfaces as a ``PermissionError`` in + ``local_cache.prep_jid`` and ultimately as + ``SaltClientError: Some exception handling minion payload`` on + the user's terminal. + + Assert that ``write_batch_state`` and ``add_to_active_index`` / + ``remove_from_active_index`` are not called by ``Batch.run()``. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with patch("salt.utils.batch_state.write_batch_state") as write_state, patch( + "salt.utils.batch_state.add_to_active_index" + ) as add_index, patch( + "salt.utils.batch_state.remove_from_active_index" + ) as remove_index: + list(Batch.run(batch)) + + write_state.assert_not_called() + add_index.assert_not_called() + remove_index.assert_not_called() + + +def test_run_fires_new_progress_and_complete_events(batch): + """ + The sync CLI batch driver replaces direct disk writes with + event-bus emissions to the master-side ``BatchManager``. Every + state transition gets a ``salt/batch//{new,progress,complete, + halted}`` event carrying the full state in ``data["state"]``; + the manager persists ``.batch.p`` / updates the active index on + the CLI's behalf so ``batch.list_active`` and ``batch.status`` + have something to read. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + # Halted-event poll must return None for the batch to actually + # run (MagicMock's default truthy return would short-circuit the + # loop into the halt path). + batch.local.event.get_event = MagicMock(return_value=None) + + list(Batch.run(batch)) + + fire_calls = batch.local.event.fire_event.call_args_list + tags = [c.args[1] for c in fire_calls] + assert any(t.startswith("salt/batch/") and t.endswith("/new") for t in tags) + assert any(t.startswith("salt/batch/") and t.endswith("/progress") for t in tags) + assert any(t.startswith("salt/batch/") and t.endswith("/complete") for t in tags) + # /halted only fires when the run halts. + assert not any(t.endswith("/halted") for t in tags) + + # Every payload should embed the full state dict so the manager + # can persist it without having to read its own disk. + new_call = next(c for c in fire_calls if c.args[1].endswith("/new")) + assert "state" in new_call.args[0] + assert new_call.args[0]["state"]["driver"] == "cli" + + +def test_run_subscribes_and_unsubscribes_to_halted(batch): + """ + The CLI subscribes to ``salt/batch//halted`` for the run's + duration so it can react to a ``batch.stop`` request from the + runner; on teardown it unsubscribes. + """ + batch.opts = { + "batch": "100%", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + batch.local.event.get_event = MagicMock(return_value=None) + + list(Batch.run(batch)) + + sub_args = batch.local.event.subscribe.call_args.args + unsub_args = batch.local.event.unsubscribe.call_args.args + assert sub_args[0].endswith("/halted") + assert unsub_args[0].endswith("/halted") + # Same tag both ways. + assert sub_args[0] == unsub_args[0] + + +def test_run_halts_when_stop_event_arrives(batch): + """ + When ``batch.stop `` is invoked, the master-side + ``BatchManager._handle_stop`` fires ``salt/batch//halted``. + The CLI polls for that tag every loop iteration and halts the + run when one arrives — and the teardown event must be the + ``/halted`` variant, not ``/complete``. + """ + batch.opts = { + "batch": "1", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1", "m2", "m3", "m4"], [], []]) + + # Iterator that never returns; the run can only end via halt. + batch.local.cmd_iter_no_block = MagicMock(side_effect=lambda *a, **k: iter([None])) + + # First poll → no halt; second poll → halt event for our jid. + poll_box = {"n": 0, "captured_jid": None} + + def _capture_jid(payload, tag): + if tag.endswith("/new"): + poll_box["captured_jid"] = payload["jid"] + + batch.local.event.fire_event = MagicMock(side_effect=_capture_jid) + + def _get_event(*args, **kwargs): + poll_box["n"] += 1 + if poll_box["n"] < 2: + return None + return {"jid": poll_box["captured_jid"], "reason": "stop"} + + batch.local.event.get_event = MagicMock(side_effect=_get_event) + + list(Batch.run(batch)) + + fire_calls = batch.local.event.fire_event.call_args_list + tags = [c.args[1] for c in fire_calls] + assert any(t.endswith("/halted") for t in tags) + assert not any(t.endswith("/complete") for t in tags) + + +def test_run_event_failures_are_swallowed(batch): + """ + The CLI batch must remain functional even when the master event + bus is broken or unreachable: every fire / subscribe / poll path + must catch and log instead of propagating. This preserves the + 3007.x behavior of "no visibility but the batch still works." + """ + batch.opts = { + "batch": "100%", + "timeout": 5, + "fun": "test.ping", + "arg": [], + "gather_job_timeout": 5, + } + batch.gather_minions = MagicMock(return_value=[["m1"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + batch.local.event.fire_event = MagicMock(side_effect=OSError("bus down")) + batch.local.event.subscribe = MagicMock(side_effect=OSError("bus down")) + batch.local.event.unsubscribe = MagicMock(side_effect=OSError("bus down")) + batch.local.event.get_event = MagicMock(side_effect=OSError("bus down")) + + # Should not raise; should yield m1's return. + results = list(Batch.run(batch)) + assert len(results) == 1 + assert next(iter(results[0][0].values())) is True diff --git a/tests/pytests/unit/cli/test_batch_parity.py b/tests/pytests/unit/cli/test_batch_parity.py index 2c3a6e3bc075..9d9ed3ea44cf 100644 --- a/tests/pytests/unit/cli/test_batch_parity.py +++ b/tests/pytests/unit/cli/test_batch_parity.py @@ -162,13 +162,21 @@ def _mock_iter(*args, **kwargs): batch.local.cmd_iter_no_block = MagicMock(side_effect=_mock_iter) + # The sync CLI driver no longer persists ``.batch.p`` (see issue + # #69418), so we capture the post-tick state by wrapping + # ``progress_batch`` in the ``salt.cli.batch`` namespace. persisted = {} + real_progress_batch = progress_batch - def _capture_state(jid, state, opts, *, best_effort=False): + def _capture_progress(state, *args, **kwargs): + action = real_progress_batch(state, *args, **kwargs) persisted["last"] = dict(state) - return True + return action - with patch("salt.utils.batch_state.write_batch_state", side_effect=_capture_state): + with patch( + "salt.cli.batch.salt.utils.batch_state.progress_batch", + side_effect=_capture_progress, + ): list(Batch.run(batch)) driver_state = persisted["last"] @@ -205,13 +213,21 @@ def _empty_iter(*args, **kwargs): batch.local.cmd_iter_no_block = MagicMock(side_effect=_empty_iter) + # See ``test_sync_driver_matches_progress_batch_oracle`` for why + # we capture the driver state via ``progress_batch`` rather than + # ``write_batch_state`` (issue #69418). persisted = {} + real_progress_batch = progress_batch - def _capture_state(jid, state, opts, *, best_effort=False): + def _capture_progress(state, *args, **kwargs): + action = real_progress_batch(state, *args, **kwargs) persisted["last"] = dict(state) - return True + return action - with patch("salt.utils.batch_state.write_batch_state", side_effect=_capture_state): + with patch( + "salt.cli.batch.salt.utils.batch_state.progress_batch", + side_effect=_capture_progress, + ): yields = list(Batch.run(batch)) assert persisted["last"]["failed"] == {"m1": "timeout", "m2": "timeout"} diff --git a/tests/pytests/unit/cli/test_batch_visibility.py b/tests/pytests/unit/cli/test_batch_visibility.py new file mode 100644 index 000000000000..4c4ead74c2ad --- /dev/null +++ b/tests/pytests/unit/cli/test_batch_visibility.py @@ -0,0 +1,357 @@ +""" +Wired-up tests for sync CLI batch visibility via the master-side +``BatchManager`` (issue #69418). + +These tests bridge ``Batch.run()`` and a real ``BatchManager`` by +routing the CLI's ``self.local.event.fire_event`` calls into the +manager's ``_handle_event`` directly, instead of through a real master +event bus. That lets us prove the end-to-end visibility contract +without a forked master process: + +* While the sync CLI batch is running, ``batch.list_active`` / + ``batch.status`` see the batch via the manager-written + ``.batch.p`` + active index. +* When ``batch.stop`` is invoked, the manager fires + ``salt/batch//halted`` and the CLI observes it and exits. +* Master-as-``salt``-user / CLI-as-``root`` style deployments + cannot regress: the CLI itself never writes under the master's + cachedir, asserted by patching out the persistence helpers. +""" + +import pytest + +# Initialize ``__opts__`` on the runner module the way the loader +# would at runtime. This module-import-time setup keeps ``patch.dict`` +# from having to ``create=True`` (which trips the salt-loader-dunder +# pylint plugin). +import salt.runners.batch as _batch_runner_module # noqa: E402 +import salt.utils.batch_output +import salt.utils.batch_state +from salt.cli.batch import Batch +from salt.runners import batch as batch_runner +from salt.utils.batch_manager import BatchManager +from tests.support.mock import MagicMock, patch + +if not hasattr(_batch_runner_module, "__dict__") or "__opts__" not in vars( + _batch_runner_module +): + vars(_batch_runner_module).setdefault("__opts__", {}) + + +def _patch_runner_opts(opts): + """Override __opts__ for the batch runner during a single test.""" + return patch.dict(batch_runner.__opts__, opts, clear=True) + + +@pytest.fixture +def opts(tmp_path): + """Master-side opts shared by the CLI and the manager.""" + return { + "cachedir": str(tmp_path), + "hash_type": "sha256", + "sock_dir": str(tmp_path), + "conf_file": str(tmp_path / "master"), + "batch_manager_loop_interval": 5, + } + + +@pytest.fixture +def manager(opts): + """A BatchManager wired up without a real fork.""" + mgr = BatchManager(opts) + mgr.event = MagicMock() + mgr.local = MagicMock() + mgr.output = MagicMock() + mgr.active_batches = set() + return mgr + + +@pytest.fixture +def cli_batch(opts): + """ + A sync CLI ``Batch`` whose ``LocalClient`` is mocked. The + fixture leaves ``local.event`` as a default ``MagicMock`` — + individual tests can replace it with a bus bridge. + """ + cli_opts = { + "batch": "1", + "tgt": "*", + "tgt_type": "glob", + "fun": "test.ping", + "arg": [], + "timeout": 5, + "gather_job_timeout": 5, + "transport": "", + } + cli_opts.update(opts) + mock_client = MagicMock() + with patch( + "salt.client.get_local_client", MagicMock(return_value=mock_client) + ), patch("salt.client.LocalClient.cmd_iter", MagicMock(return_value=[])): + yield Batch(cli_opts, quiet=True) + + +def _bridge_cli_to_manager(cli_batch, manager): + """ + Wire ``cli_batch.local.event.fire_event`` so each fired event is + immediately routed through ``manager._handle_event``. Models + "the master event bus delivers the event to the manager," which + is the contract that matters here. + + The CLI's halt-event poll (``get_event``) defaults to ``None``; + individual tests override it to inject a stop. + """ + cli_batch.local.event.get_event = MagicMock(return_value=None) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + + def _bridge(payload, tag): + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_bridge) + + +def test_batch_list_active_sees_running_sync_cli_batch(cli_batch, manager, opts): + """ + During a sync CLI batch run, the master-side ``batch.list_active`` + runner must see the JID. The manager writes ``.batch.p`` from the + ``salt/batch//new`` event and adds the JID to the active + index — both of which ``list_active`` reads. + """ + _bridge_cli_to_manager(cli_batch, manager) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + seen = {"during_first_iter": None} + + # The 2-minion batch=1 case results in two ``cmd_iter_no_block`` + # calls. On the *second* call (i.e. mid-run between sub-batches) + # snapshot what ``batch.list_active`` returns. + call_count = {"n": 0} + + def _make_iter(*args, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 2: + with _patch_runner_opts(opts): + seen["during_first_iter"] = batch_runner.list_active() + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with _patch_runner_opts(opts): + list(Batch.run(cli_batch)) + + assert seen["during_first_iter"] is not None, "list_active was not invoked mid-run" + assert len(seen["during_first_iter"]) == 1 + summary = seen["during_first_iter"][0] + assert summary["driver"] == "cli" + assert summary["fun"] == "test.ping" + + # After the run, the manager's ``salt/batch//complete`` + # handler should have removed the JID from the index. + with _patch_runner_opts(opts): + after = batch_runner.list_active() + assert after == [] + + +def test_batch_status_sees_running_sync_cli_batch(cli_batch, manager, opts): + """ + ``batch.status `` must return live progress for a sync CLI + batch while it's running. + """ + cli_batch.local.event.get_event = MagicMock(return_value=None) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + captured_jid = {"jid": None} + seen = {"during": None} + + def _capture_jid(payload, tag): + if tag.endswith("/new"): + captured_jid["jid"] = payload["jid"] + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_capture_jid) + + call_count = {"n": 0} + + def _make_iter(*args, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 2 and captured_jid["jid"]: + with _patch_runner_opts(opts): + seen["during"] = batch_runner.status(captured_jid["jid"]) + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + with _patch_runner_opts(opts): + list(Batch.run(cli_batch)) + + assert seen["during"] is not None + assert seen["during"]["driver"] == "cli" + assert seen["during"]["total"] == 2 + + +def test_batch_stop_halts_sync_cli_batch(cli_batch, manager, opts): + """ + ``salt-run batch.stop `` fires ``salt/batch//stop``, + which the manager's ``_handle_stop`` converts to a write of + ``halted=True`` and an outgoing ``salt/batch//halted`` + event. The CLI subscribes to ``/halted`` and observes it via + ``get_event``; the run terminates without completing the + remaining minions. + """ + captured = {"jid": None, "halted_payload": None} + + # Capture the jid as soon as the CLI fires ``new``. + def _bridge(payload, tag): + if tag.endswith("/new"): + captured["jid"] = payload["jid"] + if tag.endswith("/halted") and "state" in payload: + captured["halted_payload"] = payload + + # Make the manager's _handle_stop fire a halted event back at + # us by having the manager use its own fire_event. We + # capture what the manager would emit by stubbing + # mgr.event.fire_event to call back into ourselves. + manager._handle_event({"tag": tag, "data": payload}) + + cli_batch.local.event.fire_event = MagicMock(side_effect=_bridge) + cli_batch.local.event.subscribe = MagicMock(return_value=None) + cli_batch.local.event.unsubscribe = MagicMock(return_value=None) + + # When the manager handles ``stop``, its EventOutput will + # eventually try to fire ``halted``; since the manager's event is + # a MagicMock, we capture its fire_event call and have the CLI's + # get_event surface that payload on next poll. + halt_box = {"emitted": None} + + def _mgr_fire_event(payload, tag): + if tag.endswith("/halted"): + halt_box["emitted"] = payload + + manager.event.fire_event.side_effect = _mgr_fire_event + manager.output = salt.utils.batch_output.EventOutput(opts, manager.event) + + def _make_iter(*args, **kwargs): + # Yield None forever so the loop can only end via halt. + while True: + yield None + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2", "m3"], [], []]) + + poll_count = {"n": 0} + + def _get_event(*args, **kwargs): + poll_count["n"] += 1 + if poll_count["n"] == 1: + # First poll: simulate the operator running + # ``salt-run batch.stop`` between CLI loop iterations + # by directly invoking the runner with our shared opts. + with patch.dict( + batch_runner.__opts__, + opts, + clear=True, + create=True, + ): + # Bypass the runner's get_master_event context + # manager — its event bus isn't real here — and call + # the manager's stop handler directly. This models + # what would happen on a live master once the + # runner's fired event reaches the manager loop. + manager._handle_stop( + captured["jid"], {"jid": captured["jid"], "reason": "stop"} + ) + return None + if poll_count["n"] == 2 and halt_box["emitted"] is not None: + # The manager just emitted halted; surface it to the CLI. + return halt_box["emitted"] + return None + + cli_batch.local.event.get_event = MagicMock(side_effect=_get_event) + + with _patch_runner_opts(opts): + results = list(Batch.run(cli_batch)) + + # Run must have terminated without yielding all minions; the + # halt event was observed. + fire_tags = [c.args[1] for c in cli_batch.local.event.fire_event.call_args_list] + assert any(t.endswith("/halted") for t in fire_tags), ( + "CLI must emit /halted on teardown when halted, got: %s" % fire_tags + ) + + # And the final state on disk should reflect the stop. + on_disk = salt.utils.batch_state.read_batch_state(captured["jid"], opts) + assert on_disk is not None + assert on_disk["halted"] is True + assert on_disk["halted_reason"] == "stop" + + # Batch.list_active should no longer include the JID. + assert captured["jid"] not in salt.utils.batch_state.read_active_index(opts) + # Returned results may be empty (iterator never produced a + # return) or partial; we don't assert exact length, only that + # the loop exited. + del results + + +def test_master_runs_as_salt_user_cli_as_root_scenario(cli_batch, manager, opts): + """ + Reproduces the user-facing scenario from issue #69418: salt-master + running as user ``salt`` (so it owns ````), salt CLI + invoked as ``root``. In that deployment, any write to the + master's cachedir from the CLI process pre-creates the JID dir + with root ownership and bricks the master's subsequent + ``local_cache.prep_jid`` write. + + We simulate the constraint by making the cachedir un-writable + from the CLI's vantage point (``os.makedirs`` raises + ``PermissionError``) and verify that the batch run still + completes — because all persistence goes through the manager's + bridge, which in this test is the only caller that touches the + disk. + """ + _bridge_cli_to_manager(cli_batch, manager) + cli_batch.gather_minions = MagicMock(return_value=[["m1", "m2"], [], []]) + + def _make_iter(*args, **kwargs): + for m in args[0]: + yield {m: {"ret": True, "retcode": 0}} + + cli_batch.local.cmd_iter_no_block = MagicMock(side_effect=_make_iter) + + real_makedirs = __import__("os").makedirs + real_open = __import__("builtins").open + + # The manager handlers run *synchronously* inside the test's + # process, so a global ``os.makedirs`` block would also break + # the manager. Instead, simulate the failure mode at the + # source: any call to the persistence helpers from inside + # ``salt.cli.batch`` raises (using mock.patch's spec ensures we + # don't accidentally pick up other importers). + with patch( + "salt.cli.batch.salt.utils.batch_state.write_batch_state", + side_effect=PermissionError("simulated: master cachedir not writable from CLI"), + ), patch( + "salt.cli.batch.salt.utils.batch_state.add_to_active_index", + side_effect=PermissionError("simulated"), + ): + results = list(Batch.run(cli_batch)) + + # The patches above also affect the manager's view because + # Python module objects are shared (see + # https://docs.python.org/3/reference/import.html#submodules), + # but the CLI's *new* code path doesn't call those helpers + # directly — so the patches are never hit and the run completes + # normally. Both minions return. + assert len(results) == 2 + assert all(next(iter(d.values())) is True for d, _ in results) + + # Best-effort: ``real_makedirs`` and ``real_open`` are + # retained for symmetry — we don't actually need them here, but + # the import warms ``os`` and ``builtins`` and keeps the test + # close to the deployment shape. + assert real_makedirs is not None + assert real_open is not None diff --git a/tests/pytests/unit/utils/test_batch_manager.py b/tests/pytests/unit/utils/test_batch_manager.py index f733f74d688d..cffd998154ee 100644 --- a/tests/pytests/unit/utils/test_batch_manager.py +++ b/tests/pytests/unit/utils/test_batch_manager.py @@ -86,7 +86,7 @@ class TestHandleEvent: def test_batch_new_calls_handle_new(self, manager): manager._handle_new = MagicMock() manager._handle_event({"tag": "salt/batch/JID1/new", "data": {}}) - manager._handle_new.assert_called_once_with("JID1") + manager._handle_new.assert_called_once_with("JID1", {}) def test_batch_stop_calls_handle_stop(self, manager): manager._handle_stop = MagicMock() @@ -140,10 +140,42 @@ def test_adopts_master_driven_batch(self, manager, opts): assert "JID1" in manager.active_batches assert salt.utils.batch_state.read_active_index(opts) == {"JID1"} - def test_ignores_cli_driven_batch(self, manager, opts): + def test_registers_cli_driven_batch_without_adopting(self, manager, opts): + """ + Sync CLI batches arrive at the manager via + ``salt/batch//new`` carrying ``data["state"]``. The + manager persists them and adds to the active index so + ``batch.status`` / ``batch.list_active`` see them, but does + not adopt them into ``self.active_batches`` (the CLI owns + the state machine). + """ + # Build the state inline — the CLI never wrote .batch.p, the + # manager is supposed to write it on receipt of the event. + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + manager._handle_new("JID-CLI", {"state": state}) + assert "JID-CLI" not in manager.active_batches + assert salt.utils.batch_state.read_active_index(opts) == {"JID-CLI"} + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["driver"] == "cli" + + def test_cli_handle_new_falls_back_to_disk_state(self, manager, opts): + """ + Belt-and-braces: if a ``salt/batch//new`` event arrives + without an embedded ``state`` payload (older CLI, or the + event was triggered some other way) and ``.batch.p`` happens + to already exist, we still register the JID in the index. + """ _write_state(opts, ["m1"], "JID-CLI", driver="cli") manager._handle_new("JID-CLI") assert "JID-CLI" not in manager.active_batches + assert "JID-CLI" in salt.utils.batch_state.read_active_index(opts) def test_handles_missing_batch_file(self, manager, opts): manager._handle_new("NONEXISTENT") @@ -361,3 +393,149 @@ def test_tick_drives_timeout_detection(self, manager, opts): assert set(state["failed"].keys()) == {"m1", "m2"} assert state["failed"]["m1"] == "timeout" assert "JID1" not in manager.active_batches + + def test_tick_does_not_adopt_cli_driven_batches(self, manager, opts): + """ + ``_tick`` reconciles the in-memory active set with the + on-disk index but must never adopt a sync CLI batch + (``driver="cli"``). The CLI process owns the state machine + for those — adopting them would cause the manager to either + re-publish or false-timeout in-flight minions (issue #69418). + """ + _write_state(opts, ["m1", "m2"], "JID-CLI", batch_size=1, driver="cli") + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + + manager._tick(now=1.0) + + assert "JID-CLI" not in manager.active_batches + manager.local.run_job.assert_not_called() + # The index entry is preserved so ``batch.list_active`` + # still sees it. + assert "JID-CLI" in salt.utils.batch_state.read_active_index(opts) + + def test_tick_prunes_index_entries_with_no_state_file(self, manager, opts): + """ + Reconcile-time housekeeping: if an active-index entry has + no readable ``.batch.p`` (e.g. left over from a crashed + process), drop it. Maintenance does the same pass on its + slower cadence; this just lets ``_tick`` close the gap. + """ + salt.utils.batch_state.add_to_active_index("GHOST", opts) + manager._tick(now=1.0) + assert "GHOST" not in salt.utils.batch_state.read_active_index(opts) + assert "GHOST" not in manager.active_batches + + +# --------------------------------------------------------------------------- +# _handle_progress / _handle_terminal — sync CLI persistence handoff +# --------------------------------------------------------------------------- + + +class TestHandleProgressAndTerminal: + def test_handle_progress_persists_cli_state(self, manager, opts): + """ + ``salt/batch//progress`` from the sync CLI carries the + full post-tick state in ``data["state"]``. The manager + writes it so ``batch.status `` reflects the latest + snapshot. + """ + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1", "m2"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + state["done"] = {"m1": {"ret": True, "retcode": 0}} + manager._handle_progress("JID-CLI", {"state": state}) + + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["done"] == {"m1": {"ret": True, "retcode": 0}} + + def test_handle_progress_ignores_master_driven_state(self, manager, opts): + """ + Master-driven batches don't fire ``salt/batch//progress`` + from outside the manager — guard against a misuse that would + let an external actor overwrite the manager's own state. + """ + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-MGR", + driver="master", + now=1000.0, + ) + manager._handle_progress("JID-MGR", {"state": state}) + # Nothing written. + assert salt.utils.batch_state.read_batch_state("JID-MGR", opts) is None + + def test_handle_progress_drops_payload_without_state(self, manager): + # No raise, no write. + manager._handle_progress("JID-CLI", {}) + + def test_handle_terminal_complete_removes_index_entry(self, manager, opts): + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + manager._handle_terminal("JID-CLI", {"state": state}, "complete") + + assert "JID-CLI" not in salt.utils.batch_state.read_active_index(opts) + on_disk = salt.utils.batch_state.read_batch_state("JID-CLI", opts) + assert on_disk is not None + assert on_disk["driver"] == "cli" + + def test_handle_terminal_halted_removes_index_entry(self, manager, opts): + state = salt.utils.batch_state.create_batch_state( + {"batch": 1, "fun": "test.ping", "tgt": "*"}, + ["m1"], + "JID-CLI", + driver="cli", + now=1000.0, + ) + state["halted"] = True + state["halted_reason"] = "stop" + salt.utils.batch_state.add_to_active_index("JID-CLI", opts) + manager._handle_terminal("JID-CLI", {"state": state}, "halted") + + assert "JID-CLI" not in salt.utils.batch_state.read_active_index(opts) + + +# --------------------------------------------------------------------------- +# Event dispatch — sync CLI events route to the right handlers +# --------------------------------------------------------------------------- + + +class TestSyncCLIEventDispatch: + def test_batch_progress_dispatches_to_handle_progress(self, manager): + manager._handle_progress = MagicMock() + manager._handle_event( + { + "tag": "salt/batch/JID1/progress", + "data": {"state": {"jid": "JID1", "driver": "cli"}}, + } + ) + manager._handle_progress.assert_called_once_with( + "JID1", {"state": {"jid": "JID1", "driver": "cli"}} + ) + + def test_batch_complete_dispatches_to_handle_terminal(self, manager): + manager._handle_terminal = MagicMock() + manager._handle_event( + {"tag": "salt/batch/JID1/complete", "data": {"state": {}}} + ) + manager._handle_terminal.assert_called_once_with( + "JID1", {"state": {}}, "complete" + ) + + def test_batch_halted_dispatches_to_handle_terminal(self, manager): + manager._handle_terminal = MagicMock() + manager._handle_event({"tag": "salt/batch/JID1/halted", "data": {"state": {}}}) + manager._handle_terminal.assert_called_once_with( + "JID1", {"state": {}}, "halted" + )