From c3390bdbb5bc01a0ce2b7bde6b0b795297b18120 Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Tue, 25 May 2021 11:58:12 +0300 Subject: [PATCH 01/12] Async batch implementation RFC#0002 Co-authored-by: Mihai Dinca --- salt/auth/__init__.py | 2 + salt/cli/batch.py | 169 +++++-- salt/cli/batch_async.py | 516 +++++++++++++++++++++ salt/client/__init__.py | 44 +- salt/master.py | 23 + salt/utils/event.py | 8 + tests/pytests/unit/cli/test_batch.py | 85 +++- tests/pytests/unit/cli/test_batch_async.py | 430 +++++++++++++++++ 8 files changed, 1204 insertions(+), 73 deletions(-) create mode 100644 salt/cli/batch_async.py create mode 100644 tests/pytests/unit/cli/test_batch_async.py diff --git a/salt/auth/__init__.py b/salt/auth/__init__.py index d929f4e42cb1..996f95c7bf9f 100644 --- a/salt/auth/__init__.py +++ b/salt/auth/__init__.py @@ -47,6 +47,8 @@ "print_event", "raw", "yield_pub_data", + "batch", + "batch_delay", ] ) diff --git a/salt/cli/batch.py b/salt/cli/batch.py index ff02bb59a754..56f45fbfc6c1 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -33,6 +33,138 @@ log = logging.getLogger(__name__) +def get_bnum(opts, minions, quiet): + """ + Return the active number of minions to maintain + + :param dict opts: + The salt options dictionary. + + :param minions: + The list of the minions to perform the calculation. + + :param boolean quiet: + Suppress the output to the CLI. + + Preserves the legacy return values (``None`` for invalid + input, ``0`` for an empty minion list with a percentage spec, + ``0`` for ``batch=0``) for backward compatibility with any + callers that rely on them. The shared state machine uses the + hardened :func:`salt.utils.batch_state.get_batch_size` which + always returns at least 1. + """ + + def partition(x): + return float(x) / 100.0 * len(minions) + + try: + if isinstance(opts["batch"], str) and "%" in opts["batch"]: + res = partition(float(opts["batch"].strip("%"))) + if res < 1: + return int(math.ceil(res)) + return int(res) + return int(opts["batch"]) + except ValueError: + if not quiet: + salt.utils.stringutils.print_cli( + "Invalid batch data sent: {}\nData must be in the " + "form of %10, 10% or 3".format(opts["batch"]) + ) + + +def batch_get_opts( + tgt, fun, batch, parent_opts, arg=(), tgt_type="glob", ret="", kwarg=None, **kwargs +): + """ + Return the dictionary with batch options populated + + :param tgt: + Which minions to target for the execution. + + :param str fun: + The function to run. + + :param batch: + The batch size. + + :param dict parent_opts: + The salt options dictionary. + + :param list arg: + The arguments to put to the resulting ``arg`` key of resulting dictionary. + + :param str tgt_type: + Default ``glob``. Target type to use with ``tgt``. + + :param ret: + ``ret`` parameter to put to the resulting dictionary. + + :param dict kwarg: + Extra arguments to put to the resulting ``arg`` key of resulting dictionary. + + :param dict kwargs: + Extra keyword arguments. + + """ + # We need to re-import salt.utils.args here + # even though it has already been imported. + # when cmd_batch is called via the NetAPI + # the module is unavailable. + import salt.utils.args + + arg = salt.utils.args.condition_input(arg, kwarg) + opts = { + "tgt": tgt, + "fun": fun, + "arg": arg, + "tgt_type": tgt_type, + "ret": ret, + "batch": batch, + "failhard": kwargs.get("failhard", parent_opts.get("failhard", False)), + "raw": kwargs.get("raw", False), + } + + if "timeout" in kwargs: + opts["timeout"] = kwargs["timeout"] + if "gather_job_timeout" in kwargs: + opts["gather_job_timeout"] = kwargs["gather_job_timeout"] + if "batch_wait" in kwargs: + opts["batch_wait"] = int(kwargs["batch_wait"]) + + for key, val in parent_opts.items(): + if key not in opts: + opts[key] = val + + opts["batch_presence_ping_timeout"] = kwargs.get( + "batch_presence_ping_timeout", opts["timeout"] + ) + opts["batch_presence_ping_gather_job_timeout"] = kwargs.get( + "batch_presence_ping_gather_job_timeout", opts["gather_job_timeout"] + ) + + return opts + + +def batch_get_eauth(kwargs): + """ + Return the dictionary with eauth information + + :param dict kwargs: + Keyword arguments to extract eauth data from. + + """ + eauth = {} + if "eauth" in kwargs: + eauth["eauth"] = kwargs.pop("eauth") + if "username" in kwargs: + eauth["username"] = kwargs.pop("username") + if "password" in kwargs: + eauth["password"] = kwargs.pop("password") + if "token" in kwargs: + eauth["token"] = kwargs.pop("token") + return eauth + + class Batch: """ Manage the execution of batch runs. @@ -55,6 +187,7 @@ def __init__(self, opts, eauth=None, quiet=False, _parser=None): self.pub_kwargs = eauth if eauth else {} self.quiet = quiet self.options = _parser + self.minions = set() # Passing listen True to local client will prevent it from purging # cached events while iterating over the batches. self.local = salt.client.get_local_client(opts["conf_file"], listen=True) @@ -67,7 +200,7 @@ def gather_minions(self): self.opts["tgt"], "test.ping", [], - self.opts["timeout"], + self.opts.get("batch_presence_ping_timeout", self.opts["timeout"]), ] selected_target_option = self.opts.get("selected_target_option", None) @@ -79,7 +212,10 @@ def gather_minions(self): self.pub_kwargs["yield_pub_data"] = True ping_gen = self.local.cmd_iter( *args, - gather_job_timeout=self.opts["gather_job_timeout"], + gather_job_timeout=self.opts.get( + "batch_presence_ping_gather_job_timeout", + self.opts["gather_job_timeout"], + ), **self.pub_kwargs, ) @@ -110,35 +246,6 @@ def gather_minions(self): return (list(fret), ping_gen, nret.difference(fret)) - def get_bnum(self): - """ - Return the active number of minions to maintain. - - Preserves the legacy return values (``None`` for invalid - input, ``0`` for an empty minion list with a percentage spec, - ``0`` for ``batch=0``) for backward compatibility with any - callers that rely on them. The shared state machine uses the - hardened :func:`salt.utils.batch_state.get_batch_size` which - always returns at least 1. - """ - - def partition(x): - return float(x) / 100.0 * len(self.minions) - - try: - if isinstance(self.opts["batch"], str) and "%" in self.opts["batch"]: - res = partition(float(self.opts["batch"].strip("%"))) - if res < 1: - return int(math.ceil(res)) - return int(res) - return int(self.opts["batch"]) - except ValueError: - if not self.quiet: - salt.utils.stringutils.print_cli( - "Invalid batch data sent: {}\nData must be in the " - "form of %10, 10% or 3".format(self.opts["batch"]) - ) - def run(self): """ Execute the batch run. diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py new file mode 100644 index 000000000000..19f8e647d776 --- /dev/null +++ b/salt/cli/batch_async.py @@ -0,0 +1,516 @@ +""" +Execute a job on the targeted minions by using a moving window of fixed size `batch`. +""" + +import logging +import re + +import salt.client +import salt.ext.tornado +import salt.utils.event +from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum +from salt.ext.tornado.iostream import StreamClosedError + +log = logging.getLogger(__name__) + + +__SHARED_EVENTS_CHANNEL = None + + +def _get_shared_events_channel(opts, io_loop): + global __SHARED_EVENTS_CHANNEL + if __SHARED_EVENTS_CHANNEL is None: + __SHARED_EVENTS_CHANNEL = SharedEventsChannel(opts, io_loop) + return __SHARED_EVENTS_CHANNEL + + +def _destroy_unused_shared_events_channel(): + global __SHARED_EVENTS_CHANNEL + if __SHARED_EVENTS_CHANNEL is not None and __SHARED_EVENTS_CHANNEL.destroy_unused(): + __SHARED_EVENTS_CHANNEL = None + + +def batch_async_required(opts, minions, extra): + """ + Check opts to identify if batch async is required for the operation. + """ + if not isinstance(minions, list): + return False + batch_async_opts = opts.get("batch_async", {}) + batch_async_threshold = ( + batch_async_opts.get("threshold", 1) + if isinstance(batch_async_opts, dict) + else 1 + ) + if batch_async_threshold == -1: + batch_size = get_bnum(extra, minions, True) + return len(minions) >= batch_size + elif batch_async_threshold > 0: + return len(minions) >= batch_async_threshold + return False + + +class SharedEventsChannel: + def __init__(self, opts, io_loop): + self.io_loop = io_loop + self.local_client = salt.client.get_local_client( + opts["conf_file"], io_loop=self.io_loop + ) + self.master_event = salt.utils.event.get_event( + "master", + sock_dir=self.local_client.opts["sock_dir"], + opts=self.local_client.opts, + listen=True, + io_loop=self.io_loop, + keep_loop=True, + ) + self.master_event.set_event_handler(self.__handle_event) + if self.master_event.subscriber._stream: + self.master_event.subscriber._stream.set_close_callback(self.__handle_close) + self._re_tag_ret_event = re.compile(r"salt\/job\/(\d+)\/ret\/.*") + self._subscribers = {} + self._subscriptions = {} + self._used_by = set() + batch_async_opts = opts.get("batch_async", {}) + if not isinstance(batch_async_opts, dict): + batch_async_opts = {} + self._subscriber_reconnect_tries = batch_async_opts.get( + "subscriber_reconnect_tries", 5 + ) + self._subscriber_reconnect_interval = batch_async_opts.get( + "subscriber_reconnect_interval", 1.0 + ) + self._reconnecting_subscriber = False + + def subscribe(self, jid, op, subscriber_id, handler): + if subscriber_id not in self._subscribers: + self._subscribers[subscriber_id] = set() + if jid not in self._subscriptions: + self._subscriptions[jid] = [] + self._subscribers[subscriber_id].add(jid) + if (op, subscriber_id, handler) not in self._subscriptions[jid]: + self._subscriptions[jid].append((op, subscriber_id, handler)) + if not self.master_event.subscriber.connected(): + self.__reconnect_subscriber() + + def unsubscribe(self, jid, op, subscriber_id): + if subscriber_id not in self._subscribers: + return + jids = self._subscribers[subscriber_id].copy() + if jid is not None: + jids = set(jid) + for i_jid in jids: + self._subscriptions[i_jid] = list( + filter( + lambda x: not (op in (x[0], None) and x[1] == subscriber_id), + self._subscriptions.get(i_jid, []), + ) + ) + self._subscribers[subscriber_id].discard(i_jid) + self._subscriptions = dict(filter(lambda x: x[1], self._subscriptions.items())) + if not self._subscribers[subscriber_id]: + del self._subscribers[subscriber_id] + + @salt.ext.tornado.gen.coroutine + def __handle_close(self): + if not self._subscriptions: + return + log.warning("Master Event Subscriber was closed. Trying to reconnect...") + yield self.__reconnect_subscriber() + + @salt.ext.tornado.gen.coroutine + def __handle_event(self, raw): + if self.master_event is None: + return + try: + tag, data = self.master_event.unpack(raw) + tag_match = self._re_tag_ret_event.match(tag) + if tag_match: + jid = tag_match.group(1) + if jid in self._subscriptions: + for op, _, handler in self._subscriptions[jid]: + yield handler(tag, data, op) + except Exception as ex: # pylint: disable=W0703 + log.error( + "Exception occured while processing event: %s: %s", + tag, + ex, + exc_info=True, + ) + + @salt.ext.tornado.gen.coroutine + def __reconnect_subscriber(self): + if self.master_event.subscriber.connected() or self._reconnecting_subscriber: + return + self._reconnecting_subscriber = True + max_tries = max(1, int(self._subscriber_reconnect_tries)) + _try = 1 + while _try <= max_tries: + log.info( + "Trying to reconnect to event publisher (try %d of %d) ...", + _try, + max_tries, + ) + try: + yield self.master_event.subscriber.connect() + except StreamClosedError: + log.warning( + "Unable to reconnect to event publisher (try %d of %d)", + _try, + max_tries, + ) + if self.master_event.subscriber.connected(): + self.master_event.subscriber.stream.set_close_callback( + self.__handle_close + ) + log.info("Event publisher connection restored") + self._reconnecting_subscriber = False + return + if _try < max_tries: + yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval) + _try += 1 + self._reconnecting_subscriber = False + + def use(self, subscriber_id): + self._used_by.add(subscriber_id) + return self + + def unuse(self, subscriber_id): + self._used_by.discard(subscriber_id) + + def destroy_unused(self): + if self._used_by: + return False + self.master_event.remove_event_handler(self.__handle_event) + self.master_event.destroy() + self.master_event = None + self.local_client.destroy() + self.local_client = None + return True + + +class BatchAsync: + """ + Run a job on the targeted minions by using a moving window of fixed size `batch`. + + ``BatchAsync`` is used to execute a job on the targeted minions by keeping + the number of concurrent running minions to the size of `batch` parameter. + + The control parameters are: + - batch: number/percentage of concurrent running minions + - batch_delay: minimum wait time between batches + - batch_presence_ping_timeout: time to wait for presence pings before starting the batch + - gather_job_timeout: `find_job` timeout + - timeout: time to wait before firing a `find_job` + + When the batch starts, a `start` event is fired: + - tag: salt/batch//start + - data: { + "available_minions": self.minions, + "down_minions": targeted_minions - presence_ping_minions + } + + When the batch ends, a `done` event is fired: + - tag: salt/batch//done + - data: { + "available_minions": self.minions, + "down_minions": targeted_minions - presence_ping_minions + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions + } + """ + + def __init__(self, opts, jid_gen, clear_load): + self.extra_job_kwargs = {} + kwargs = clear_load.get("kwargs", {}) + for kwarg in ("module_executors", "executor_opts"): + if kwarg in kwargs: + self.extra_job_kwargs[kwarg] = kwargs[kwarg] + elif kwarg in opts: + self.extra_job_kwargs[kwarg] = opts[kwarg] + self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() + self.events_channel = _get_shared_events_channel(opts, self.io_loop).use( + id(self) + ) + if "gather_job_timeout" in clear_load["kwargs"]: + clear_load["gather_job_timeout"] = clear_load["kwargs"].pop( + "gather_job_timeout" + ) + else: + clear_load["gather_job_timeout"] = self.events_channel.local_client.opts[ + "gather_job_timeout" + ] + self.batch_presence_ping_timeout = clear_load["kwargs"].get( + "batch_presence_ping_timeout", None + ) + self.batch_delay = clear_load["kwargs"].get("batch_delay", 1) + self.opts = batch_get_opts( + clear_load.pop("tgt"), + clear_load.pop("fun"), + clear_load["kwargs"].pop("batch"), + self.events_channel.local_client.opts, + **clear_load, + ) + self.eauth = batch_get_eauth(clear_load["kwargs"]) + self.metadata = clear_load["kwargs"].get("metadata", {}) + self.minions = set() + self.targeted_minions = set() + self.timedout_minions = set() + self.done_minions = set() + self.active = set() + self.initialized = False + self.jid_gen = jid_gen + self.ping_jid = jid_gen() + self.batch_jid = jid_gen() + self.find_job_returned = set() + self.metadata.update({"batch_jid": self.batch_jid, "ping_jid": self.ping_jid}) + self.ended = False + self.event = self.events_channel.master_event + self.scheduled = False + + def __set_event_handler(self): + self.events_channel.subscribe( + self.ping_jid, "ping_return", id(self), self.__event_handler + ) + self.events_channel.subscribe( + self.batch_jid, "batch_run", id(self), self.__event_handler + ) + + @salt.ext.tornado.gen.coroutine + def __event_handler(self, tag, data, op): + if not self.event: + return + try: + minion = data["id"] + if op == "ping_return": + self.minions.add(minion) + if self.targeted_minions == self.minions: + yield self.start_batch() + elif op == "find_job_return": + if data.get("return", None): + self.find_job_returned.add(minion) + elif op == "batch_run": + if minion in self.active: + self.active.remove(minion) + self.done_minions.add(minion) + yield self.schedule_next() + except Exception as ex: # pylint: disable=W0703 + log.error( + "Exception occured while processing event: %s: %s", + tag, + ex, + exc_info=True, + ) + + def _get_next(self): + to_run = ( + self.minions.difference(self.done_minions) + .difference(self.active) + .difference(self.timedout_minions) + ) + next_batch_size = min( + len(to_run), # partial batch (all left) + self.batch_size - len(self.active), # full batch or available slots + ) + return set(list(to_run)[:next_batch_size]) + + @salt.ext.tornado.gen.coroutine + def check_find_job(self, batch_minions, jid): + """ + Check if the job with specified ``jid`` was finished on the minions + """ + if not self.event: + return + self.events_channel.unsubscribe(jid, "find_job_return", id(self)) + + timedout_minions = batch_minions.difference(self.find_job_returned).difference( + self.done_minions + ) + self.timedout_minions = self.timedout_minions.union(timedout_minions) + self.active = self.active.difference(self.timedout_minions) + running = batch_minions.difference(self.done_minions).difference( + self.timedout_minions + ) + + if timedout_minions: + yield self.schedule_next() + + if self.event and running: + self.find_job_returned = self.find_job_returned.difference(running) + yield self.find_job(running) + + @salt.ext.tornado.gen.coroutine + def find_job(self, minions): + """ + Find if the job was finished on the minions + """ + if not self.event: + return + not_done = minions.difference(self.done_minions).difference( + self.timedout_minions + ) + if not not_done: + return + try: + jid = self.jid_gen() + self.events_channel.subscribe( + jid, "find_job_return", id(self), self.__event_handler + ) + ret = yield self.events_channel.local_client.run_job_async( + not_done, + "saltutil.find_job", + [self.batch_jid], + "list", + gather_job_timeout=self.opts["gather_job_timeout"], + jid=jid, + io_loop=self.io_loop, + listen=False, + **self.eauth, + ) + yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"]) + if self.event: + yield self.check_find_job(not_done, jid) + except Exception as ex: # pylint: disable=W0703 + log.error( + "Exception occured handling batch async: %s. Aborting execution.", + ex, + exc_info=True, + ) + self.close_safe() + + @salt.ext.tornado.gen.coroutine + def start(self): + """ + Start the batch execution + """ + if not self.event: + return + self.__set_event_handler() + ping_return = yield self.events_channel.local_client.run_job_async( + self.opts["tgt"], + "test.ping", + [], + self.opts.get("selected_target_option", self.opts.get("tgt_type", "glob")), + gather_job_timeout=self.opts["gather_job_timeout"], + jid=self.ping_jid, + metadata=self.metadata, + io_loop=self.io_loop, + listen=False, + **self.eauth, + ) + self.targeted_minions = set(ping_return["minions"]) + # start batching even if not all minions respond to ping + yield salt.ext.tornado.gen.sleep( + self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] + ) + if self.event: + yield self.start_batch() + + @salt.ext.tornado.gen.coroutine + def start_batch(self): + """ + Fire `salt/batch/*/start` and continue batch with `run_next` + """ + if self.initialized: + return + self.batch_size = get_bnum(self.opts, self.minions, True) + self.initialized = True + data = { + "available_minions": self.minions, + "down_minions": self.targeted_minions.difference(self.minions), + "metadata": self.metadata, + } + yield self.events_channel.master_event.fire_event_async( + data, f"salt/batch/{self.batch_jid}/start" + ) + if self.event: + yield self.run_next() + + @salt.ext.tornado.gen.coroutine + def end_batch(self): + """ + End the batch and call safe closing + """ + left = self.minions.symmetric_difference( + self.done_minions.union(self.timedout_minions) + ) + # Send salt/batch/*/done only if there is nothing to do + # and the event haven't been sent already + if left or self.ended: + return + self.ended = True + data = { + "available_minions": self.minions, + "down_minions": self.targeted_minions.difference(self.minions), + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions, + "metadata": self.metadata, + } + yield self.events_channel.master_event.fire_event_async( + data, f"salt/batch/{self.batch_jid}/done" + ) + + # release to the IOLoop to allow the event to be published + # before closing batch async execution + yield salt.ext.tornado.gen.sleep(1) + self.close_safe() + + def close_safe(self): + if self.events_channel is not None: + self.events_channel.unsubscribe(None, None, id(self)) + self.events_channel.unuse(id(self)) + self.events_channel = None + _destroy_unused_shared_events_channel() + self.event = None + + @salt.ext.tornado.gen.coroutine + def schedule_next(self): + if self.scheduled: + return + self.scheduled = True + # call later so that we maybe gather more returns + yield salt.ext.tornado.gen.sleep(self.batch_delay) + if self.event: + yield self.run_next() + + @salt.ext.tornado.gen.coroutine + def run_next(self): + """ + Continue batch execution with the next targets + """ + self.scheduled = False + next_batch = self._get_next() + if not next_batch: + yield self.end_batch() + return + self.active = self.active.union(next_batch) + try: + ret = yield self.events_channel.local_client.run_job_async( + next_batch, + self.opts["fun"], + self.opts["arg"], + "list", + raw=self.opts.get("raw", False), + ret=self.opts.get("return", ""), + gather_job_timeout=self.opts["gather_job_timeout"], + jid=self.batch_jid, + metadata=self.metadata, + io_loop=self.io_loop, + listen=False, + **self.eauth, + **self.extra_job_kwargs, + ) + + yield salt.ext.tornado.gen.sleep(self.opts["timeout"]) + + # The batch can be done already at this point, which means no self.event + if self.event: + yield self.find_job(set(next_batch)) + except Exception as ex: # pylint: disable=W0703 + log.error( + "Error in scheduling next batch: %s. Aborting execution", + ex, + exc_info=True, + ) + self.active = self.active.difference(next_batch) + self.close_safe() diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 3a1cd0f8e67c..b1993a97edcc 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -745,38 +745,20 @@ def cmd_batch( import salt.cli.batch import salt.utils.args - arg = salt.utils.args.condition_input(arg, kwarg) - opts = { - "tgt": tgt, - "fun": fun, - "arg": arg, - "tgt_type": tgt_type, - "ret": ret, - "batch": batch, - "failhard": kwargs.get("failhard", self.opts.get("failhard", False)), - "raw": kwargs.get("raw", False), - } + opts = salt.cli.batch.batch_get_opts( + tgt, + fun, + batch, + self.opts, + arg=arg, + tgt_type=tgt_type, + ret=ret, + kwarg=kwarg, + **kwargs + ) + + eauth = salt.cli.batch.batch_get_eauth(kwargs) - if "timeout" in kwargs: - opts["timeout"] = kwargs["timeout"] - if "gather_job_timeout" in kwargs: - opts["gather_job_timeout"] = kwargs["gather_job_timeout"] - if "batch_wait" in kwargs: - opts["batch_wait"] = int(kwargs["batch_wait"]) - - eauth = {} - if "eauth" in kwargs: - eauth["eauth"] = kwargs.pop("eauth") - if "username" in kwargs: - eauth["username"] = kwargs.pop("username") - if "password" in kwargs: - eauth["password"] = kwargs.pop("password") - if "token" in kwargs: - eauth["token"] = kwargs.pop("token") - - for key, val in self.opts.items(): - if key not in opts: - opts[key] = val batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True) for ret, _ in batch.run(): yield ret diff --git a/salt/master.py b/salt/master.py index d933ba6478f2..4f4e59ce3a31 100644 --- a/salt/master.py +++ b/salt/master.py @@ -67,6 +67,7 @@ import salt.utils.verify import salt.utils.zeromq import salt.wheel +from salt.cli.batch_async import BatchAsync, batch_async_required from salt.config import DEFAULT_INTERVAL from salt.defaults import DEFAULT_TARGET_DELIM from salt.exceptions import UnsupportedAlgorithm @@ -3622,6 +3623,7 @@ class ClearFuncs(TransportMethods): expose_methods = ( "ping", "publish", + "publish_batch", "get_token", "mk_token", "wheel", @@ -3822,6 +3824,24 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load["token"]) + async def publish_batch(self, clear_load, minions, missing): + """ + This method sends out publications to the minions in case of using batch + """ + batch_load = {} + batch_load.update(clear_load) + batch = BatchAsync( + self.local.opts, + lambda: self._prep_jid(clear_load, {}), + batch_load, + ) + asyncio.create_task(batch.start) + + return { + "enc": "clear", + "load": {"jid": batch.batch_jid, "minions": minions, "missing": missing}, + } + async def publish(self, clear_load): """ This method sends out publications to the minions, it can only be used @@ -3969,6 +3989,9 @@ async def publish(self, clear_load): ), }, } + if extra.get("batch", None) and batch_async_required(self.opts, minions, extra): + return await self.publish_batch(clear_load, minions, missing) + jid = self._prep_jid(clear_load, extra) if jid is None or isinstance(jid, dict): if jid and "error" in jid: diff --git a/salt/utils/event.py b/salt/utils/event.py index aad9efeecebf..bb80f6551588 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -994,6 +994,13 @@ def fire_ret_load(self, load): # Minion fired a bad retcode, fire an event self._fire_ret_load_specific_fun(load) + def remove_event_handler(self, event_handler): + """ + Remove the event_handler callback + """ + if event_handler in self.subscriber.callbacks: + self.subscriber.callbacks.remove(event_handler) + def set_event_handler(self, event_handler): """ Invoke the event_handler callback each time an event arrives. @@ -1001,6 +1008,7 @@ def set_event_handler(self, event_handler): assert not self._run_io_loop_sync if not self.cpub: self.connect_pub() + # This will handle reconnects self._schedule(self.subscriber.on_recv, event_handler) diff --git a/tests/pytests/unit/cli/test_batch.py b/tests/pytests/unit/cli/test_batch.py index 1e90219e4236..8a5d061bb889 100644 --- a/tests/pytests/unit/cli/test_batch.py +++ b/tests/pytests/unit/cli/test_batch.py @@ -4,7 +4,7 @@ import pytest -from salt.cli.batch import Batch +from salt.cli.batch import Batch, batch_get_opts, get_bnum from tests.support.mock import MagicMock, patch @@ -31,7 +31,7 @@ def test_get_bnum_str(batch): """ batch.opts = {"batch": "2", "timeout": 5} batch.minions = ["foo", "bar"] - assert Batch.get_bnum(batch) == 2 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 2 def test_get_bnum_int(batch): @@ -40,7 +40,7 @@ def test_get_bnum_int(batch): """ batch.opts = {"batch": 2, "timeout": 5} batch.minions = ["foo", "bar"] - assert Batch.get_bnum(batch) == 2 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 2 def test_get_bnum_percentage(batch): @@ -49,7 +49,7 @@ def test_get_bnum_percentage(batch): """ batch.opts = {"batch": "50%", "timeout": 5} batch.minions = ["foo"] - assert Batch.get_bnum(batch) == 1 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 1 def test_get_bnum_high_percentage(batch): @@ -58,14 +58,14 @@ def test_get_bnum_high_percentage(batch): """ batch.opts = {"batch": "160%", "timeout": 5} batch.minions = ["foo", "bar", "baz"] - assert Batch.get_bnum(batch) == 4 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 4 def test_get_bnum_invalid_batch_data(batch): """ Tests when an invalid batch value is passed """ - ret = Batch.get_bnum(batch) + ret = get_bnum(batch.opts, batch.minions, batch.quiet) assert ret is None @@ -266,7 +266,7 @@ def test_get_bnum_100_percentage_exact(batch): """ batch.opts = {"batch": "100%", "timeout": 5} batch.minions = ["a", "b", "c", "d"] - assert Batch.get_bnum(batch) == 4 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 4 def test_get_bnum_low_percentage_rounds_up(batch): @@ -276,7 +276,7 @@ def test_get_bnum_low_percentage_rounds_up(batch): """ batch.opts = {"batch": "1%", "timeout": 5} batch.minions = [f"m{i}" for i in range(10)] - assert Batch.get_bnum(batch) == 1 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 1 def test_get_bnum_zero(batch): @@ -286,7 +286,7 @@ def test_get_bnum_zero(batch): """ batch.opts = {"batch": 0, "timeout": 5} batch.minions = ["a", "b"] - assert Batch.get_bnum(batch) == 0 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 0 def test_get_bnum_percentage_no_minions(batch): @@ -296,7 +296,7 @@ def test_get_bnum_percentage_no_minions(batch): """ batch.opts = {"batch": "50%", "timeout": 5} batch.minions = [] - assert Batch.get_bnum(batch) == 0 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 0 def test_get_bnum_fractional_percentage(batch): @@ -306,7 +306,7 @@ def test_get_bnum_fractional_percentage(batch): """ batch.opts = {"batch": "33.3%", "timeout": 5} batch.minions = [f"m{i}" for i in range(10)] - assert Batch.get_bnum(batch) == 3 + assert get_bnum(batch.opts, batch.minions, batch.quiet) == 3 def test_run_no_minions_returns_early(batch): @@ -545,3 +545,66 @@ def _fake_sleep(duration): # dispatch. Without batch_wait, the second dispatch would happen # immediately and no 0.02s sleeps would be issued. assert spin_sleeps[0] >= 1 + + +def test_batch_presence_ping(batch): + """ + Tests passing batch_presence_ping_timeout and batch_presence_ping_gather_job_timeout + """ + ret = batch_get_opts("", "test.ping", "2", {}, timeout=20, gather_job_timeout=120) + assert ret["batch_presence_ping_timeout"] == 20 + assert ret["batch_presence_ping_gather_job_timeout"] == 120 + ret = batch_get_opts( + "", + "test.ping", + "2", + {}, + timeout=20, + gather_job_timeout=120, + batch_presence_ping_timeout=4, + batch_presence_ping_gather_job_timeout=360, + ) + assert ret["batch_presence_ping_timeout"] == 4 + assert ret["batch_presence_ping_gather_job_timeout"] == 360 + + +def test_gather_minions_with_batch_presence_ping(batch): + """ + Tests __gather_minions with batch_presence_ping options + """ + opts_no_pp = { + "batch": "2", + "conf_file": {}, + "tgt": "", + "transport": "", + "gather_job_timeout": 20, + } + opts_with_pp = { + "batch": "2", + "conf_file": {}, + "tgt": "", + "transport": "", + "timeout": 5, + "gather_job_timeout": 20, + "batch_presence_ping_timeout": 3, + "batch_presence_ping_gather_job_timeout": 4, + } + local_client_mock = MagicMock() + with patch( + "salt.client.get_local_client", MagicMock(return_value=local_client_mock) + ), patch("salt.client.LocalClient.cmd_iter", MagicMock(return_value=[])): + Batch(opts_no_pp).gather_minions() + Batch(opts_with_pp).gather_minions() + assert local_client_mock.mock_calls[0][1][3] == opts_no_pp["timeout"] + assert ( + local_client_mock.mock_calls[0][2]["gather_job_timeout"] + == opts_no_pp["gather_job_timeout"] + ) + assert ( + local_client_mock.mock_calls[2][1][3] + == opts_with_pp["batch_presence_ping_timeout"] + ) + assert ( + local_client_mock.mock_calls[2][2]["gather_job_timeout"] + == opts_with_pp["batch_presence_ping_gather_job_timeout"] + ) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py new file mode 100644 index 000000000000..bc871aba54c6 --- /dev/null +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -0,0 +1,430 @@ +import pytest + +import salt.ext.tornado +from salt.cli.batch_async import BatchAsync, batch_async_required +from tests.support.mock import MagicMock, patch + + +@pytest.fixture +def batch(temp_salt_master): + opts = { + "batch": "1", + "conf_file": {}, + "tgt": "*", + "timeout": 5, + "gather_job_timeout": 5, + "batch_presence_ping_timeout": 1, + "transport": None, + "sock_dir": "", + } + + with patch("salt.client.get_local_client", MagicMock(return_value=MagicMock())): + with patch("salt.cli.batch_async.batch_get_opts", MagicMock(return_value=opts)): + batch = BatchAsync( + opts, + MagicMock(side_effect=["1234", "1235"]), + { + "tgt": "", + "fun": "", + "kwargs": { + "batch": "", + "batch_presence_ping_timeout": 1, + "metadata": {"mykey": "myvalue"}, + }, + }, + ) + yield batch + + +@pytest.mark.parametrize( + "threshold,minions,batch,expected", + [ + (1, 2, 200, True), + (1, 500, 200, True), + (0, 2, 200, False), + (0, 500, 200, False), + (-1, 2, 200, False), + (-1, 500, 200, True), + (-1, 9, 10, False), + (-1, 11, 10, True), + (10, 9, 8, False), + (10, 9, 10, False), + (10, 11, 8, True), + (10, 11, 10, True), + ], +) +def test_batch_async_required(threshold, minions, batch, expected): + minions_list = [f"minion{i}.example.org" for i in range(minions)] + batch_async_opts = {"batch_async": {"threshold": threshold}} + extra = {"batch": batch} + assert batch_async_required(batch_async_opts, minions_list, extra) == expected + + +def test_ping_jid(batch): + assert batch.ping_jid == "1234" + + +def test_batch_jid(batch): + assert batch.batch_jid == "1235" + + +def test_batch_size(batch): + """ + Tests passing batch value as a number + """ + batch.opts = {"batch": "2", "timeout": 5} + batch.minions = {"foo", "bar"} + batch.start_batch() + assert batch.batch_size == 2 + + +def test_batch_start_on_batch_presence_ping_timeout(batch): + future_ret = salt.ext.tornado.gen.Future() + future_ret.set_result({"minions": ["foo", "bar"]}) + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "events_channel", MagicMock()), patch( + "salt.ext.tornado.gen.sleep", return_value=future + ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock: + batch.events_channel.local_client.run_job_async.return_value = future_ret + ret = batch.start() + # assert start_batch is called + start_batch_mock.assert_called_once() + # assert test.ping called + assert batch.events_channel.local_client.run_job_async.call_args[0] == ( + "*", + "test.ping", + [], + "glob", + ) + # assert targeted_minions == all minions matched by tgt + assert batch.targeted_minions == {"foo", "bar"} + + +def test_batch_start_on_gather_job_timeout(batch): + future = salt.ext.tornado.gen.Future() + future.set_result({}) + future_ret = salt.ext.tornado.gen.Future() + future_ret.set_result({"minions": ["foo", "bar"]}) + batch.batch_presence_ping_timeout = None + with patch.object(batch, "events_channel", MagicMock()), patch( + "salt.ext.tornado.gen.sleep", return_value=future + ), patch.object( + batch, "start_batch", return_value=future + ) as start_batch_mock, patch.object( + batch, "batch_presence_ping_timeout", None + ): + batch.events_channel.local_client.run_job_async.return_value = future_ret + # ret = batch_async.start(batch) + ret = batch.start() + # assert start_batch is called + start_batch_mock.assert_called_once() + + +def test_batch_fire_start_event(batch): + batch.minions = {"foo", "bar"} + batch.opts = {"batch": "2", "timeout": 5} + with patch.object(batch, "events_channel", MagicMock()): + batch.start_batch() + assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( + { + "available_minions": {"foo", "bar"}, + "down_minions": set(), + "metadata": batch.metadata, + }, + "salt/batch/1235/start", + ) + + +def test_start_batch_calls_next(batch): + batch.initialized = False + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "event", MagicMock()), patch.object( + batch, "events_channel", MagicMock() + ), patch.object(batch, "run_next", return_value=future) as run_next_mock: + batch.events_channel.master_event.fire_event_async.return_value = future + batch.start_batch() + assert batch.initialized + run_next_mock.assert_called_once() + + +def test_batch_fire_done_event(batch): + batch.targeted_minions = {"foo", "baz", "bar"} + batch.minions = {"foo", "bar"} + batch.done_minions = {"foo"} + batch.timedout_minions = {"bar"} + with patch.object(batch, "events_channel", MagicMock()): + batch.end_batch() + assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( + { + "available_minions": {"foo", "bar"}, + "done_minions": batch.done_minions, + "down_minions": {"baz"}, + "timedout_minions": batch.timedout_minions, + "metadata": batch.metadata, + }, + "salt/batch/1235/done", + ) + + +def test_batch_close_safe(batch): + with patch.object( + batch, "events_channel", MagicMock() + ) as events_channel_mock, patch.object(batch, "event", MagicMock()): + batch.close_safe() + batch.close_safe() + assert batch.events_channel is None + assert batch.event is None + events_channel_mock.unsubscribe.assert_called_once() + events_channel_mock.unuse.assert_called_once() + + +def test_batch_next(batch): + batch.opts["fun"] = "my.fun" + batch.opts["arg"] = [] + batch.batch_size = 2 + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + batch, "events_channel", MagicMock() + ), patch.object(batch, "_get_next", return_value={"foo", "bar"}), patch.object( + batch, "find_job", return_value=future + ) as find_job_mock: + batch.events_channel.local_client.run_job_async.return_value = future + batch.run_next() + assert batch.events_channel.local_client.run_job_async.call_args[0] == ( + {"foo", "bar"}, + "my.fun", + [], + "list", + ) + assert find_job_mock.call_args[0] == ({"foo", "bar"},) + assert batch.active == {"bar", "foo"} + + +def test_next_batch(batch): + batch.minions = {"foo", "bar"} + batch.batch_size = 2 + assert batch._get_next() == {"foo", "bar"} + + +def test_next_batch_one_done(batch): + batch.minions = {"foo", "bar"} + batch.done_minions = {"bar"} + batch.batch_size = 2 + assert batch._get_next() == {"foo"} + + +def test_next_batch_one_done_one_active(batch): + batch.minions = {"foo", "bar", "baz"} + batch.done_minions = {"bar"} + batch.active = {"baz"} + batch.batch_size = 2 + assert batch._get_next() == {"foo"} + + +def test_next_batch_one_done_one_active_one_timedout(batch): + batch.minions = {"foo", "bar", "baz", "faz"} + batch.done_minions = {"bar"} + batch.active = {"baz"} + batch.timedout_minions = {"faz"} + batch.batch_size = 2 + assert batch._get_next() == {"foo"} + + +def test_next_batch_bigger_size(batch): + batch.minions = {"foo", "bar"} + batch.batch_size = 3 + assert batch._get_next() == {"foo", "bar"} + + +def test_next_batch_all_done(batch): + batch.minions = {"foo", "bar"} + batch.done_minions = {"foo", "bar"} + batch.batch_size = 2 + assert batch._get_next() == set() + + +def test_next_batch_all_active(batch): + batch.minions = {"foo", "bar"} + batch.active = {"foo", "bar"} + batch.batch_size = 2 + assert batch._get_next() == set() + + +def test_next_batch_all_timedout(batch): + batch.minions = {"foo", "bar"} + batch.timedout_minions = {"foo", "bar"} + batch.batch_size = 2 + assert batch._get_next() == set() + + +def test_batch__event_handler_ping_return(batch): + batch.targeted_minions = {"foo"} + batch.start() + assert batch.minions == set() + batch._BatchAsync__event_handler( + "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" + ) + assert batch.minions == {"foo"} + assert batch.done_minions == set() + + +def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): + batch.targeted_minions = {"foo"} + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: + batch.start() + batch._BatchAsync__event_handler( + "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" + ) + start_batch_mock.assert_called_once() + + +def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): + batch.targeted_minions = {"foo", "bar"} + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: + batch.start() + batch._BatchAsync__event_handler( + "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" + ) + start_batch_mock.assert_not_called() + + +def test_batch__event_handler_batch_run_return(batch): + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object( + batch, "schedule_next", return_value=future + ) as schedule_next_mock: + batch.start() + batch.active = {"foo"} + batch._BatchAsync__event_handler( + "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run" + ) + assert batch.active == set() + assert batch.done_minions == {"foo"} + schedule_next_mock.assert_called_once() + + +def test_batch__event_handler_find_job_return(batch): + batch.start() + batch._BatchAsync__event_handler( + "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return" + ) + assert batch.find_job_returned == {"foo"} + + +def test_batch_run_next_end_batch_when_no_next(batch): + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object( + batch, "_get_next", return_value={} + ), patch.object( + batch, "end_batch", return_value=future + ) as end_batch_mock: + batch.run_next() + end_batch_mock.assert_called_once() + + +def test_batch_find_job(batch): + future = salt.ext.tornado.gen.Future() + future.set_result({}) + batch.minions = {"foo", "bar"} + with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + batch, "check_find_job", return_value=future + ) as check_find_job_mock, patch.object( + batch, "jid_gen", return_value="1236" + ): + batch.events_channel.local_client.run_job_async.return_value = future + batch.find_job({"foo", "bar"}) + assert check_find_job_mock.call_args[0] == ( + {"foo", "bar"}, + "1236", + ) + + +def test_batch_find_job_with_done_minions(batch): + batch.done_minions = {"bar"} + future = salt.ext.tornado.gen.Future() + future.set_result({}) + batch.minions = {"foo", "bar"} + with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + batch, "check_find_job", return_value=future + ) as check_find_job_mock, patch.object( + batch, "jid_gen", return_value="1236" + ): + batch.events_channel.local_client.run_job_async.return_value = future + batch.find_job({"foo", "bar"}) + assert check_find_job_mock.call_args[0] == ( + {"foo"}, + "1236", + ) + + +def test_batch_check_find_job_did_not_return(batch): + batch.active = {"foo"} + batch.find_job_returned = set() + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "find_job", return_value=future) as find_job_mock: + batch.check_find_job({"foo"}, jid="1234") + assert batch.find_job_returned == set() + assert batch.active == set() + find_job_mock.assert_not_called() + + +def test_batch_check_find_job_did_return(batch): + batch.find_job_returned = {"foo"} + future = salt.ext.tornado.gen.Future() + future.set_result({}) + with patch.object(batch, "find_job", return_value=future) as find_job_mock: + batch.check_find_job({"foo"}, jid="1234") + find_job_mock.assert_called_once_with({"foo"}) + + +def test_batch_check_find_job_multiple_states(batch): + # currently running minions + batch.active = {"foo", "bar"} + + # minion is running and find_job returns + batch.find_job_returned = {"foo"} + + # minion started running but find_job did not return + batch.timedout_minions = {"faz"} + + # minion finished + batch.done_minions = {"baz"} + + # both not yet done but only 'foo' responded to find_job + not_done = {"foo", "bar"} + + future = salt.ext.tornado.gen.Future() + future.set_result({}) + + with patch.object(batch, "schedule_next", return_value=future), patch.object( + batch, "find_job", return_value=future + ) as find_job_mock: + batch.check_find_job(not_done, jid="1234") + + # assert 'bar' removed from active + assert batch.active == {"foo"} + + # assert 'bar' added to timedout_minions + assert batch.timedout_minions == {"bar", "faz"} + + # assert 'find_job' schedueled again only for 'foo' + find_job_mock.assert_called_once_with({"foo"}) + + +def test_only_on_run_next_is_scheduled(batch): + future = salt.ext.tornado.gen.Future() + future.set_result({}) + batch.scheduled = True + with patch.object(batch, "run_next", return_value=future) as run_next_mock: + batch.schedule_next() + run_next_mock.assert_not_called() From 375089cdade67d1bb0e22c07511b72e2963416cc Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Fri, 24 May 2024 14:38:51 +0200 Subject: [PATCH 02/12] Use asyncio --- salt/cli/batch_async.py | 88 ++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 19f8e647d776..b9ffcf792dd4 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -2,14 +2,15 @@ Execute a job on the targeted minions by using a moving window of fixed size `batch`. """ +import asyncio import logging import re +import tornado import salt.client -import salt.ext.tornado import salt.utils.event from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum -from salt.ext.tornado.iostream import StreamClosedError +from tornado.iostream import StreamClosedError log = logging.getLogger(__name__) @@ -111,15 +112,13 @@ def unsubscribe(self, jid, op, subscriber_id): if not self._subscribers[subscriber_id]: del self._subscribers[subscriber_id] - @salt.ext.tornado.gen.coroutine - def __handle_close(self): + async def __handle_close(self): if not self._subscriptions: return log.warning("Master Event Subscriber was closed. Trying to reconnect...") - yield self.__reconnect_subscriber() + await self.__reconnect_subscriber() - @salt.ext.tornado.gen.coroutine - def __handle_event(self, raw): + async def __handle_event(self, raw): if self.master_event is None: return try: @@ -129,7 +128,7 @@ def __handle_event(self, raw): jid = tag_match.group(1) if jid in self._subscriptions: for op, _, handler in self._subscriptions[jid]: - yield handler(tag, data, op) + await handler(tag, data, op) except Exception as ex: # pylint: disable=W0703 log.error( "Exception occured while processing event: %s: %s", @@ -138,8 +137,7 @@ def __handle_event(self, raw): exc_info=True, ) - @salt.ext.tornado.gen.coroutine - def __reconnect_subscriber(self): + async def __reconnect_subscriber(self): if self.master_event.subscriber.connected() or self._reconnecting_subscriber: return self._reconnecting_subscriber = True @@ -152,7 +150,7 @@ def __reconnect_subscriber(self): max_tries, ) try: - yield self.master_event.subscriber.connect() + await self.master_event.subscriber.connect() except StreamClosedError: log.warning( "Unable to reconnect to event publisher (try %d of %d)", @@ -167,7 +165,7 @@ def __reconnect_subscriber(self): self._reconnecting_subscriber = False return if _try < max_tries: - yield salt.ext.tornado.gen.sleep(self._subscriber_reconnect_interval) + await asyncio.sleep(self._subscriber_reconnect_interval) _try += 1 self._reconnecting_subscriber = False @@ -228,7 +226,7 @@ def __init__(self, opts, jid_gen, clear_load): self.extra_job_kwargs[kwarg] = kwargs[kwarg] elif kwarg in opts: self.extra_job_kwargs[kwarg] = opts[kwarg] - self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() + self.io_loop = tornado.ioloop.IOLoop.current() self.events_channel = _get_shared_events_channel(opts, self.io_loop).use( id(self) ) @@ -276,8 +274,7 @@ def __set_event_handler(self): self.batch_jid, "batch_run", id(self), self.__event_handler ) - @salt.ext.tornado.gen.coroutine - def __event_handler(self, tag, data, op): + async def __event_handler(self, tag, data, op): if not self.event: return try: @@ -285,7 +282,7 @@ def __event_handler(self, tag, data, op): if op == "ping_return": self.minions.add(minion) if self.targeted_minions == self.minions: - yield self.start_batch() + await self.start_batch() elif op == "find_job_return": if data.get("return", None): self.find_job_returned.add(minion) @@ -293,7 +290,7 @@ def __event_handler(self, tag, data, op): if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - yield self.schedule_next() + await self.schedule_next() except Exception as ex: # pylint: disable=W0703 log.error( "Exception occured while processing event: %s: %s", @@ -314,8 +311,7 @@ def _get_next(self): ) return set(list(to_run)[:next_batch_size]) - @salt.ext.tornado.gen.coroutine - def check_find_job(self, batch_minions, jid): + async def check_find_job(self, batch_minions, jid): """ Check if the job with specified ``jid`` was finished on the minions """ @@ -333,14 +329,13 @@ def check_find_job(self, batch_minions, jid): ) if timedout_minions: - yield self.schedule_next() + await self.schedule_next() if self.event and running: self.find_job_returned = self.find_job_returned.difference(running) - yield self.find_job(running) + await self.find_job(running) - @salt.ext.tornado.gen.coroutine - def find_job(self, minions): + async def find_job(self, minions): """ Find if the job was finished on the minions """ @@ -356,7 +351,7 @@ def find_job(self, minions): self.events_channel.subscribe( jid, "find_job_return", id(self), self.__event_handler ) - ret = yield self.events_channel.local_client.run_job_async( + ret = await self.events_channel.local_client.run_job_async( not_done, "saltutil.find_job", [self.batch_jid], @@ -367,9 +362,9 @@ def find_job(self, minions): listen=False, **self.eauth, ) - yield salt.ext.tornado.gen.sleep(self.opts["gather_job_timeout"]) + await asyncio.sleep(self.opts["gather_job_timeout"]) if self.event: - yield self.check_find_job(not_done, jid) + await self.check_find_job(not_done, jid) except Exception as ex: # pylint: disable=W0703 log.error( "Exception occured handling batch async: %s. Aborting execution.", @@ -378,15 +373,14 @@ def find_job(self, minions): ) self.close_safe() - @salt.ext.tornado.gen.coroutine - def start(self): + async def start(self): """ Start the batch execution """ if not self.event: return self.__set_event_handler() - ping_return = yield self.events_channel.local_client.run_job_async( + ping_return = await self.events_channel.local_client.run_job_async( self.opts["tgt"], "test.ping", [], @@ -400,14 +394,13 @@ def start(self): ) self.targeted_minions = set(ping_return["minions"]) # start batching even if not all minions respond to ping - yield salt.ext.tornado.gen.sleep( + await asyncio.sleep( self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] ) if self.event: - yield self.start_batch() + await self.start_batch() - @salt.ext.tornado.gen.coroutine - def start_batch(self): + async def start_batch(self): """ Fire `salt/batch/*/start` and continue batch with `run_next` """ @@ -420,14 +413,13 @@ def start_batch(self): "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata, } - yield self.events_channel.master_event.fire_event_async( + await self.events_channel.master_event.fire_event_async( data, f"salt/batch/{self.batch_jid}/start" ) if self.event: - yield self.run_next() + await self.run_next() - @salt.ext.tornado.gen.coroutine - def end_batch(self): + async def end_batch(self): """ End the batch and call safe closing """ @@ -446,13 +438,13 @@ def end_batch(self): "timedout_minions": self.timedout_minions, "metadata": self.metadata, } - yield self.events_channel.master_event.fire_event_async( + await self.events_channel.master_event.fire_event_async( data, f"salt/batch/{self.batch_jid}/done" ) # release to the IOLoop to allow the event to be published # before closing batch async execution - yield salt.ext.tornado.gen.sleep(1) + await asyncio.sleep(1) self.close_safe() def close_safe(self): @@ -463,29 +455,27 @@ def close_safe(self): _destroy_unused_shared_events_channel() self.event = None - @salt.ext.tornado.gen.coroutine - def schedule_next(self): + async def schedule_next(self): if self.scheduled: return self.scheduled = True # call later so that we maybe gather more returns - yield salt.ext.tornado.gen.sleep(self.batch_delay) + await asyncio.sleep(self.batch_delay) if self.event: - yield self.run_next() + await self.run_next() - @salt.ext.tornado.gen.coroutine - def run_next(self): + async def run_next(self): """ Continue batch execution with the next targets """ self.scheduled = False next_batch = self._get_next() if not next_batch: - yield self.end_batch() + await self.end_batch() return self.active = self.active.union(next_batch) try: - ret = yield self.events_channel.local_client.run_job_async( + ret = await self.events_channel.local_client.run_job_async( next_batch, self.opts["fun"], self.opts["arg"], @@ -501,11 +491,11 @@ def run_next(self): **self.extra_job_kwargs, ) - yield salt.ext.tornado.gen.sleep(self.opts["timeout"]) + await asyncio.sleep(self.opts["timeout"]) # The batch can be done already at this point, which means no self.event if self.event: - yield self.find_job(set(next_batch)) + await self.find_job(set(next_batch)) except Exception as ex: # pylint: disable=W0703 log.error( "Error in scheduling next batch: %s. Aborting execution", From 69af91c87d1d2eb4300946886f94452a73d6076c Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Fri, 24 May 2024 14:49:36 +0200 Subject: [PATCH 03/12] Fix tornado usage --- tests/pytests/unit/cli/test_batch_async.py | 44 +++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index bc871aba54c6..56611c833fcd 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -1,6 +1,6 @@ import pytest +import tornado -import salt.ext.tornado from salt.cli.batch_async import BatchAsync, batch_async_required from tests.support.mock import MagicMock, patch @@ -79,12 +79,12 @@ def test_batch_size(batch): def test_batch_start_on_batch_presence_ping_timeout(batch): - future_ret = salt.ext.tornado.gen.Future() + future_ret = tornado.gen.Future() future_ret.set_result({"minions": ["foo", "bar"]}) - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "events_channel", MagicMock()), patch( - "salt.ext.tornado.gen.sleep", return_value=future + "tornado.gen.sleep", return_value=future ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock: batch.events_channel.local_client.run_job_async.return_value = future_ret ret = batch.start() @@ -102,13 +102,13 @@ def test_batch_start_on_batch_presence_ping_timeout(batch): def test_batch_start_on_gather_job_timeout(batch): - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) - future_ret = salt.ext.tornado.gen.Future() + future_ret = tornado.gen.Future() future_ret.set_result({"minions": ["foo", "bar"]}) batch.batch_presence_ping_timeout = None with patch.object(batch, "events_channel", MagicMock()), patch( - "salt.ext.tornado.gen.sleep", return_value=future + "tornado.gen.sleep", return_value=future ), patch.object( batch, "start_batch", return_value=future ) as start_batch_mock, patch.object( @@ -138,7 +138,7 @@ def test_batch_fire_start_event(batch): def test_start_batch_calls_next(batch): batch.initialized = False - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "event", MagicMock()), patch.object( batch, "events_channel", MagicMock() @@ -184,9 +184,9 @@ def test_batch_next(batch): batch.opts["fun"] = "my.fun" batch.opts["arg"] = [] batch.batch_size = 2 - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) - with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + with patch("tornado.gen.sleep", return_value=future), patch.object( batch, "events_channel", MagicMock() ), patch.object(batch, "_get_next", return_value={"foo", "bar"}), patch.object( batch, "find_job", return_value=future @@ -273,7 +273,7 @@ def test_batch__event_handler_ping_return(batch): def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): batch.targeted_minions = {"foo"} - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: batch.start() @@ -285,7 +285,7 @@ def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): batch.targeted_minions = {"foo", "bar"} - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: batch.start() @@ -296,7 +296,7 @@ def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(bat def test_batch__event_handler_batch_run_return(batch): - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object( batch, "schedule_next", return_value=future @@ -320,7 +320,7 @@ def test_batch__event_handler_find_job_return(batch): def test_batch_run_next_end_batch_when_no_next(batch): - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object( batch, "_get_next", return_value={} @@ -332,10 +332,10 @@ def test_batch_run_next_end_batch_when_no_next(batch): def test_batch_find_job(batch): - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) batch.minions = {"foo", "bar"} - with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + with patch("tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future ) as check_find_job_mock, patch.object( batch, "jid_gen", return_value="1236" @@ -350,10 +350,10 @@ def test_batch_find_job(batch): def test_batch_find_job_with_done_minions(batch): batch.done_minions = {"bar"} - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) batch.minions = {"foo", "bar"} - with patch("salt.ext.tornado.gen.sleep", return_value=future), patch.object( + with patch("tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future ) as check_find_job_mock, patch.object( batch, "jid_gen", return_value="1236" @@ -369,7 +369,7 @@ def test_batch_find_job_with_done_minions(batch): def test_batch_check_find_job_did_not_return(batch): batch.active = {"foo"} batch.find_job_returned = set() - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "find_job", return_value=future) as find_job_mock: batch.check_find_job({"foo"}, jid="1234") @@ -380,7 +380,7 @@ def test_batch_check_find_job_did_not_return(batch): def test_batch_check_find_job_did_return(batch): batch.find_job_returned = {"foo"} - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "find_job", return_value=future) as find_job_mock: batch.check_find_job({"foo"}, jid="1234") @@ -403,7 +403,7 @@ def test_batch_check_find_job_multiple_states(batch): # both not yet done but only 'foo' responded to find_job not_done = {"foo", "bar"} - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "schedule_next", return_value=future), patch.object( @@ -422,7 +422,7 @@ def test_batch_check_find_job_multiple_states(batch): def test_only_on_run_next_is_scheduled(batch): - future = salt.ext.tornado.gen.Future() + future = tornado.gen.Future() future.set_result({}) batch.scheduled = True with patch.object(batch, "run_next", return_value=future) as run_next_mock: From d660bfb7feb9e80cb57b340df7b474ceba7e098d Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Fri, 24 May 2024 15:13:40 +0200 Subject: [PATCH 04/12] Fix async tests --- tests/pytests/unit/cli/test_batch_async.py | 74 +++++++++++----------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index 56611c833fcd..2c14d4dc9d8b 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -1,3 +1,4 @@ +import asyncio import pytest import tornado @@ -68,17 +69,17 @@ def test_batch_jid(batch): assert batch.batch_jid == "1235" -def test_batch_size(batch): +async def test_batch_size(batch): """ Tests passing batch value as a number """ batch.opts = {"batch": "2", "timeout": 5} batch.minions = {"foo", "bar"} - batch.start_batch() + await batch.start_batch() assert batch.batch_size == 2 -def test_batch_start_on_batch_presence_ping_timeout(batch): +async def test_batch_start_on_batch_presence_ping_timeout(batch): future_ret = tornado.gen.Future() future_ret.set_result({"minions": ["foo", "bar"]}) future = tornado.gen.Future() @@ -87,7 +88,7 @@ def test_batch_start_on_batch_presence_ping_timeout(batch): "tornado.gen.sleep", return_value=future ), patch.object(batch, "start_batch", return_value=future) as start_batch_mock: batch.events_channel.local_client.run_job_async.return_value = future_ret - ret = batch.start() + ret = await batch.start() # assert start_batch is called start_batch_mock.assert_called_once() # assert test.ping called @@ -101,7 +102,7 @@ def test_batch_start_on_batch_presence_ping_timeout(batch): assert batch.targeted_minions == {"foo", "bar"} -def test_batch_start_on_gather_job_timeout(batch): +async def test_batch_start_on_gather_job_timeout(batch): future = tornado.gen.Future() future.set_result({}) future_ret = tornado.gen.Future() @@ -116,16 +117,16 @@ def test_batch_start_on_gather_job_timeout(batch): ): batch.events_channel.local_client.run_job_async.return_value = future_ret # ret = batch_async.start(batch) - ret = batch.start() + ret = await batch.start() # assert start_batch is called start_batch_mock.assert_called_once() -def test_batch_fire_start_event(batch): +async def test_batch_fire_start_event(batch): batch.minions = {"foo", "bar"} batch.opts = {"batch": "2", "timeout": 5} with patch.object(batch, "events_channel", MagicMock()): - batch.start_batch() + await batch.start_batch() assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( { "available_minions": {"foo", "bar"}, @@ -136,7 +137,7 @@ def test_batch_fire_start_event(batch): ) -def test_start_batch_calls_next(batch): +async def test_start_batch_calls_next(batch): batch.initialized = False future = tornado.gen.Future() future.set_result({}) @@ -144,18 +145,18 @@ def test_start_batch_calls_next(batch): batch, "events_channel", MagicMock() ), patch.object(batch, "run_next", return_value=future) as run_next_mock: batch.events_channel.master_event.fire_event_async.return_value = future - batch.start_batch() + await batch.start_batch() assert batch.initialized run_next_mock.assert_called_once() -def test_batch_fire_done_event(batch): +async def test_batch_fire_done_event(batch): batch.targeted_minions = {"foo", "baz", "bar"} batch.minions = {"foo", "bar"} batch.done_minions = {"foo"} batch.timedout_minions = {"bar"} with patch.object(batch, "events_channel", MagicMock()): - batch.end_batch() + await batch.end_batch() assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( { "available_minions": {"foo", "bar"}, @@ -180,7 +181,7 @@ def test_batch_close_safe(batch): events_channel_mock.unuse.assert_called_once() -def test_batch_next(batch): +async def test_batch_next(batch): batch.opts["fun"] = "my.fun" batch.opts["arg"] = [] batch.batch_size = 2 @@ -192,7 +193,7 @@ def test_batch_next(batch): batch, "find_job", return_value=future ) as find_job_mock: batch.events_channel.local_client.run_job_async.return_value = future - batch.run_next() + await batch.run_next() assert batch.events_channel.local_client.run_job_async.call_args[0] == ( {"foo", "bar"}, "my.fun", @@ -260,9 +261,9 @@ def test_next_batch_all_timedout(batch): assert batch._get_next() == set() -def test_batch__event_handler_ping_return(batch): +async def test_batch__event_handler_ping_return(batch): batch.targeted_minions = {"foo"} - batch.start() + await batch.start() assert batch.minions == set() batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" @@ -271,37 +272,37 @@ def test_batch__event_handler_ping_return(batch): assert batch.done_minions == set() -def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): +async def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): batch.targeted_minions = {"foo"} future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - batch.start() + await batch.start() batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) start_batch_mock.assert_called_once() -def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): +async def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): batch.targeted_minions = {"foo", "bar"} future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - batch.start() + await batch.start() batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) start_batch_mock.assert_not_called() -def test_batch__event_handler_batch_run_return(batch): +async def test_batch__event_handler_batch_run_return(batch): future = tornado.gen.Future() future.set_result({}) with patch.object( batch, "schedule_next", return_value=future ) as schedule_next_mock: - batch.start() + await batch.start() batch.active = {"foo"} batch._BatchAsync__event_handler( "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run" @@ -311,15 +312,15 @@ def test_batch__event_handler_batch_run_return(batch): schedule_next_mock.assert_called_once() -def test_batch__event_handler_find_job_return(batch): - batch.start() +async def test_batch__event_handler_find_job_return(batch): + await batch.start() batch._BatchAsync__event_handler( "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return" ) assert batch.find_job_returned == {"foo"} -def test_batch_run_next_end_batch_when_no_next(batch): +async def test_batch_run_next_end_batch_when_no_next(batch): future = tornado.gen.Future() future.set_result({}) with patch.object( @@ -327,11 +328,11 @@ def test_batch_run_next_end_batch_when_no_next(batch): ), patch.object( batch, "end_batch", return_value=future ) as end_batch_mock: - batch.run_next() + await batch.run_next() end_batch_mock.assert_called_once() -def test_batch_find_job(batch): +async def test_batch_find_job(batch): future = tornado.gen.Future() future.set_result({}) batch.minions = {"foo", "bar"} @@ -341,14 +342,14 @@ def test_batch_find_job(batch): batch, "jid_gen", return_value="1236" ): batch.events_channel.local_client.run_job_async.return_value = future - batch.find_job({"foo", "bar"}) + await batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( {"foo", "bar"}, "1236", ) -def test_batch_find_job_with_done_minions(batch): +async def test_batch_find_job_with_done_minions(batch): batch.done_minions = {"bar"} future = tornado.gen.Future() future.set_result({}) @@ -359,35 +360,36 @@ def test_batch_find_job_with_done_minions(batch): batch, "jid_gen", return_value="1236" ): batch.events_channel.local_client.run_job_async.return_value = future - batch.find_job({"foo", "bar"}) + await batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( {"foo"}, "1236", ) -def test_batch_check_find_job_did_not_return(batch): +async def test_batch_check_find_job_did_not_return(batch): batch.active = {"foo"} batch.find_job_returned = set() + batch.batch_size = 2 future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "find_job", return_value=future) as find_job_mock: - batch.check_find_job({"foo"}, jid="1234") + await batch.check_find_job({"foo"}, jid="1234") assert batch.find_job_returned == set() assert batch.active == set() find_job_mock.assert_not_called() -def test_batch_check_find_job_did_return(batch): +async def test_batch_check_find_job_did_return(batch): batch.find_job_returned = {"foo"} future = tornado.gen.Future() future.set_result({}) with patch.object(batch, "find_job", return_value=future) as find_job_mock: - batch.check_find_job({"foo"}, jid="1234") + await batch.check_find_job({"foo"}, jid="1234") find_job_mock.assert_called_once_with({"foo"}) -def test_batch_check_find_job_multiple_states(batch): +async def test_batch_check_find_job_multiple_states(batch): # currently running minions batch.active = {"foo", "bar"} @@ -409,7 +411,7 @@ def test_batch_check_find_job_multiple_states(batch): with patch.object(batch, "schedule_next", return_value=future), patch.object( batch, "find_job", return_value=future ) as find_job_mock: - batch.check_find_job(not_done, jid="1234") + await batch.check_find_job(not_done, jid="1234") # assert 'bar' removed from active assert batch.active == {"foo"} From 623b8bb4e3053bb34c4c402dbaff60dce4dc200b Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Sat, 25 May 2024 13:10:37 +0200 Subject: [PATCH 05/12] Fix connected check --- salt/cli/batch_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index b9ffcf792dd4..a5012e51d626 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -91,7 +91,7 @@ def subscribe(self, jid, op, subscriber_id, handler): self._subscribers[subscriber_id].add(jid) if (op, subscriber_id, handler) not in self._subscriptions[jid]: self._subscriptions[jid].append((op, subscriber_id, handler)) - if not self.master_event.subscriber.connected(): + if not self.master_event.subscriber.connected: self.__reconnect_subscriber() def unsubscribe(self, jid, op, subscriber_id): From 413011b690aa5fbf047fc58eec4ffc9592e4d302 Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Mon, 27 May 2024 09:46:56 +0200 Subject: [PATCH 06/12] Remove remove_event_handler --- salt/cli/batch_async.py | 1 - salt/utils/event.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index a5012e51d626..a2adbd594418 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -179,7 +179,6 @@ def unuse(self, subscriber_id): def destroy_unused(self): if self._used_by: return False - self.master_event.remove_event_handler(self.__handle_event) self.master_event.destroy() self.master_event = None self.local_client.destroy() diff --git a/salt/utils/event.py b/salt/utils/event.py index bb80f6551588..608c0098b454 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -994,13 +994,6 @@ def fire_ret_load(self, load): # Minion fired a bad retcode, fire an event self._fire_ret_load_specific_fun(load) - def remove_event_handler(self, event_handler): - """ - Remove the event_handler callback - """ - if event_handler in self.subscriber.callbacks: - self.subscriber.callbacks.remove(event_handler) - def set_event_handler(self, event_handler): """ Invoke the event_handler callback each time an event arrives. From dcf7ff20d2aec6e3710916d7088bfc63ea50e11d Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Mon, 27 May 2024 11:34:44 +0200 Subject: [PATCH 07/12] Fix batch call --- salt/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/master.py b/salt/master.py index 4f4e59ce3a31..65b4d79a25e3 100644 --- a/salt/master.py +++ b/salt/master.py @@ -3835,7 +3835,7 @@ async def publish_batch(self, clear_load, minions, missing): lambda: self._prep_jid(clear_load, {}), batch_load, ) - asyncio.create_task(batch.start) + asyncio.create_task(batch.start()) return { "enc": "clear", From a6746d7dda898f249c23473989fad9a0a8dc76e4 Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Mon, 27 May 2024 12:01:40 +0200 Subject: [PATCH 08/12] Fix async tests --- tests/pytests/unit/cli/test_batch_async.py | 98 ++++++++++++++-------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index 2c14d4dc9d8b..5bf96d294955 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -3,7 +3,7 @@ import tornado from salt.cli.batch_async import BatchAsync, batch_async_required -from tests.support.mock import MagicMock, patch +from tests.support.mock import AsyncMock, MagicMock, patch @pytest.fixture @@ -125,9 +125,13 @@ async def test_batch_start_on_gather_job_timeout(batch): async def test_batch_fire_start_event(batch): batch.minions = {"foo", "bar"} batch.opts = {"batch": "2", "timeout": 5} - with patch.object(batch, "events_channel", MagicMock()): + with patch.object(batch, "events_channel", MagicMock()), patch.object( + batch.events_channel.master_event, + "fire_event_async", + AsyncMock(return_value={}), + ) as fire_event_async_mock: await batch.start_batch() - assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( + assert fire_event_async_mock.call_args[0] == ( { "available_minions": {"foo", "bar"}, "down_minions": set(), @@ -155,9 +159,13 @@ async def test_batch_fire_done_event(batch): batch.minions = {"foo", "bar"} batch.done_minions = {"foo"} batch.timedout_minions = {"bar"} - with patch.object(batch, "events_channel", MagicMock()): + with patch.object(batch, "events_channel", MagicMock()), patch.object( + batch.events_channel.master_event, + "fire_event_async", + AsyncMock(return_value={}), + ) as fire_event_async_mock: await batch.end_batch() - assert batch.events_channel.master_event.fire_event_async.call_args[0] == ( + assert fire_event_async_mock.call_args[0] == ( { "available_minions": {"foo", "bar"}, "done_minions": batch.done_minions, @@ -263,34 +271,59 @@ def test_next_batch_all_timedout(batch): async def test_batch__event_handler_ping_return(batch): batch.targeted_minions = {"foo"} - await batch.start() - assert batch.minions == set() - batch._BatchAsync__event_handler( - "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" - ) - assert batch.minions == {"foo"} - assert batch.done_minions == set() + with patch.object( + batch.events_channel.local_client, + "run_job_async", + AsyncMock(return_value={"minions": ["foo"]}), + ) as local_client_mock, patch.object( + batch.events_channel.master_event, + "fire_event_async", + AsyncMock(return_value={}), + ) as fire_event_async_mock: + asyncio.create_task(batch.start()) + assert batch.minions == set() + await batch._BatchAsync__event_handler( + "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" + ) + assert batch.minions == {"foo"} + assert batch.done_minions == set() async def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): batch.targeted_minions = {"foo"} future = tornado.gen.Future() future.set_result({}) - with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - await batch.start() - batch._BatchAsync__event_handler( + with patch.object( + batch.events_channel.local_client, + "run_job_async", + AsyncMock(return_value={"minions": ["foo"]}), + ) as local_client_mock, patch.object( + batch, "start_batch", AsyncMock(return_value=future) + ) as start_batch_mock: + asyncio.create_task(batch.start()) + await asyncio.sleep(1) + await batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) start_batch_mock.assert_called_once() -async def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(batch): +async def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return( + batch, +): batch.targeted_minions = {"foo", "bar"} future = tornado.gen.Future() future.set_result({}) - with patch.object(batch, "start_batch", return_value=future) as start_batch_mock: - await batch.start() - batch._BatchAsync__event_handler( + with patch.object( + batch.events_channel.local_client, + "run_job_async", + AsyncMock(return_value={"minions": ["foo", "bar"]}), + ) as local_client_mock, patch.object( + batch, "start_batch", AsyncMock(return_value=future) + ) as start_batch_mock: + asyncio.create_task(batch.start()) + await asyncio.sleep(1) + await batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) start_batch_mock.assert_not_called() @@ -300,11 +333,16 @@ async def test_batch__event_handler_batch_run_return(batch): future = tornado.gen.Future() future.set_result({}) with patch.object( - batch, "schedule_next", return_value=future + batch.events_channel.local_client, + "run_job_async", + AsyncMock(return_value={"minions": ["foo", "bar"]}), + ) as local_client_mock, patch.object( + batch, "schedule_next", AsyncMock(return_value=future) ) as schedule_next_mock: - await batch.start() + asyncio.create_task(batch.start()) + await asyncio.sleep(1) batch.active = {"foo"} - batch._BatchAsync__event_handler( + await batch._BatchAsync__event_handler( "salt/job/1235/ret/foo", {"id": "foo"}, "batch_run" ) assert batch.active == set() @@ -313,8 +351,8 @@ async def test_batch__event_handler_batch_run_return(batch): async def test_batch__event_handler_find_job_return(batch): - await batch.start() - batch._BatchAsync__event_handler( + asyncio.create_task(batch.start()) + await batch._BatchAsync__event_handler( "salt/job/1236/ret/foo", {"id": "foo", "return": "deadbeaf"}, "find_job_return" ) assert batch.find_job_returned == {"foo"} @@ -323,9 +361,7 @@ async def test_batch__event_handler_find_job_return(batch): async def test_batch_run_next_end_batch_when_no_next(batch): future = tornado.gen.Future() future.set_result({}) - with patch.object( - batch, "_get_next", return_value={} - ), patch.object( + with patch.object(batch, "_get_next", return_value={}), patch.object( batch, "end_batch", return_value=future ) as end_batch_mock: await batch.run_next() @@ -338,9 +374,7 @@ async def test_batch_find_job(batch): batch.minions = {"foo", "bar"} with patch("tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future - ) as check_find_job_mock, patch.object( - batch, "jid_gen", return_value="1236" - ): + ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"): batch.events_channel.local_client.run_job_async.return_value = future await batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( @@ -356,9 +390,7 @@ async def test_batch_find_job_with_done_minions(batch): batch.minions = {"foo", "bar"} with patch("tornado.gen.sleep", return_value=future), patch.object( batch, "check_find_job", return_value=future - ) as check_find_job_mock, patch.object( - batch, "jid_gen", return_value="1236" - ): + ) as check_find_job_mock, patch.object(batch, "jid_gen", return_value="1236"): batch.events_channel.local_client.run_job_async.return_value = future await batch.find_job({"foo", "bar"}) assert check_find_job_mock.call_args[0] == ( From 43f43be81b9a2725d71df330ecc5fb82267986c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= Date: Fri, 20 Dec 2024 16:40:34 +0000 Subject: [PATCH 09/12] Make batch async working with asyncio batch async: enhance trace logging batch async: do not block event handler additionally, only try to execute next batch once all previous minions has been processed somehow batch async: do not block current find_job execution batch async: just start batch if all minions returns the ping batch_async: ensure the start/done events are fired batch async: do not wait to finalize batch execution batch async: only trigger find job for current batch active minions Prevent traceback due missing 'send' method when socket was closed Modify some batch_async trace logs --- salt/cli/batch_async.py | 61 +++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index a2adbd594418..88781bee0bc5 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -2,10 +2,12 @@ Execute a job on the targeted minions by using a moving window of fixed size `batch`. """ -import asyncio +# pylint: enable=import-error,no-name-in-module,redefined-builtin import logging import re + import tornado +import asyncio import salt.client import salt.utils.event @@ -138,7 +140,7 @@ async def __handle_event(self, raw): ) async def __reconnect_subscriber(self): - if self.master_event.subscriber.connected() or self._reconnecting_subscriber: + if self.master_event.subscriber.connected or self._reconnecting_subscriber: return self._reconnecting_subscriber = True max_tries = max(1, int(self._subscriber_reconnect_tries)) @@ -157,8 +159,8 @@ async def __reconnect_subscriber(self): _try, max_tries, ) - if self.master_event.subscriber.connected(): - self.master_event.subscriber.stream.set_close_callback( + if self.master_event.subscriber.connected: + self.master_event.subscriber._stream.set_close_callback( self.__handle_close ) log.info("Event publisher connection restored") @@ -177,6 +179,7 @@ def unuse(self, subscriber_id): self._used_by.discard(subscriber_id) def destroy_unused(self): + log.trace("SharedEventsChannel.destroy_unused called") if self._used_by: return False self.master_event.destroy() @@ -274,14 +277,14 @@ def __set_event_handler(self): ) async def __event_handler(self, tag, data, op): + # IMPORTANT: This function must run fast and not wait for any other task, + # otherwise it would cause events to be stuck. if not self.event: return try: minion = data["id"] if op == "ping_return": self.minions.add(minion) - if self.targeted_minions == self.minions: - await self.start_batch() elif op == "find_job_return": if data.get("return", None): self.find_job_returned.add(minion) @@ -289,7 +292,8 @@ async def __event_handler(self, tag, data, op): if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - await self.schedule_next() + if not self.active: + asyncio.create_task(self.schedule_next()) except Exception as ex: # pylint: disable=W0703 log.error( "Exception occured while processing event: %s: %s", @@ -328,7 +332,7 @@ async def check_find_job(self, batch_minions, jid): ) if timedout_minions: - await self.schedule_next() + asyncio.create_task(self.schedule_next()) if self.event and running: self.find_job_returned = self.find_job_returned.difference(running) @@ -338,6 +342,7 @@ async def find_job(self, minions): """ Find if the job was finished on the minions """ + log.trace("BatchAsync.find_job called for minions: %s", minions) if not self.event: return not_done = minions.difference(self.done_minions).difference( @@ -350,7 +355,7 @@ async def find_job(self, minions): self.events_channel.subscribe( jid, "find_job_return", id(self), self.__event_handler ) - ret = await self.events_channel.local_client.run_job_async( + await self.events_channel.local_client.run_job_async( not_done, "saltutil.find_job", [self.batch_jid], @@ -393,9 +398,18 @@ async def start(self): ) self.targeted_minions = set(ping_return["minions"]) # start batching even if not all minions respond to ping - await asyncio.sleep( - self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] - ) + try: + async with asyncio.timeout( + self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] + ): + while True: + await asyncio.sleep(0.03) + if self.targeted_minions == self.minions: + break + except TimeoutError: + # Some minions are down, scheduling batch anyway + pass + if self.event: await self.start_batch() @@ -412,8 +426,8 @@ async def start_batch(self): "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata, } - await self.events_channel.master_event.fire_event_async( - data, f"salt/batch/{self.batch_jid}/start" + ret = self.event.fire_event( + data, "salt/batch/{}/start".format(self.batch_jid) ) if self.event: await self.run_next() @@ -422,6 +436,7 @@ async def end_batch(self): """ End the batch and call safe closing """ + log.trace("BatchAsync.end_batch called") left = self.minions.symmetric_difference( self.done_minions.union(self.timedout_minions) ) @@ -437,16 +452,17 @@ async def end_batch(self): "timedout_minions": self.timedout_minions, "metadata": self.metadata, } - await self.events_channel.master_event.fire_event_async( + ret = self.event.fire_event( data, f"salt/batch/{self.batch_jid}/done" ) # release to the IOLoop to allow the event to be published # before closing batch async execution - await asyncio.sleep(1) + await asyncio.sleep(0.03) self.close_safe() def close_safe(self): + log.trace("BatchAsync.close_safe called") if self.events_channel is not None: self.events_channel.unsubscribe(None, None, id(self)) self.events_channel.unuse(id(self)) @@ -455,11 +471,15 @@ def close_safe(self): self.event = None async def schedule_next(self): + log.trace("BatchAsync.schedule_next called") if self.scheduled: + log.trace("BatchAsync.schedule_next -> Batch already scheduled, nothing to do.") return self.scheduled = True - # call later so that we maybe gather more returns - await asyncio.sleep(self.batch_delay) + if self._get_next(): + # call later so that we maybe gather more returns + log.trace("BatchAsync.schedule_next delaying batch %s second(s).", self.batch_delay) + await asyncio.sleep(self.batch_delay) if self.event: await self.run_next() @@ -469,12 +489,13 @@ async def run_next(self): """ self.scheduled = False next_batch = self._get_next() + log.trace("BatchAsync.run_next called. Next Batch -> %s", next_batch) if not next_batch: await self.end_batch() return self.active = self.active.union(next_batch) try: - ret = await self.events_channel.local_client.run_job_async( + await self.events_channel.local_client.run_job_async( next_batch, self.opts["fun"], self.opts["arg"], @@ -493,7 +514,7 @@ async def run_next(self): await asyncio.sleep(self.opts["timeout"]) # The batch can be done already at this point, which means no self.event - if self.event: + if self.event and self.active.intersection(next_batch): await self.find_job(set(next_batch)) except Exception as ex: # pylint: disable=W0703 log.error( From 81f3307dcb829c9ee2374f692fd0e3b10bfcceb7 Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Tue, 21 Jan 2025 11:37:14 +0100 Subject: [PATCH 10/12] Make pre-commit check happy --- salt/cli/batch_async.py | 24 +++++++++++----------- salt/client/__init__.py | 2 +- tests/pytests/unit/cli/test_batch_async.py | 1 + 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 88781bee0bc5..949ec196ae04 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -2,17 +2,16 @@ Execute a job on the targeted minions by using a moving window of fixed size `batch`. """ -# pylint: enable=import-error,no-name-in-module,redefined-builtin +import asyncio import logging import re import tornado -import asyncio +from tornado.iostream import StreamClosedError import salt.client import salt.utils.event from salt.cli.batch import batch_get_eauth, batch_get_opts, get_bnum -from tornado.iostream import StreamClosedError log = logging.getLogger(__name__) @@ -402,7 +401,7 @@ async def start(self): async with asyncio.timeout( self.batch_presence_ping_timeout or self.opts["gather_job_timeout"] ): - while True: + while True: await asyncio.sleep(0.03) if self.targeted_minions == self.minions: break @@ -426,9 +425,7 @@ async def start_batch(self): "down_minions": self.targeted_minions.difference(self.minions), "metadata": self.metadata, } - ret = self.event.fire_event( - data, "salt/batch/{}/start".format(self.batch_jid) - ) + ret = self.event.fire_event(data, f"salt/batch/{self.batch_jid}/start") if self.event: await self.run_next() @@ -452,9 +449,7 @@ async def end_batch(self): "timedout_minions": self.timedout_minions, "metadata": self.metadata, } - ret = self.event.fire_event( - data, f"salt/batch/{self.batch_jid}/done" - ) + ret = self.event.fire_event(data, f"salt/batch/{self.batch_jid}/done") # release to the IOLoop to allow the event to be published # before closing batch async execution @@ -473,12 +468,17 @@ def close_safe(self): async def schedule_next(self): log.trace("BatchAsync.schedule_next called") if self.scheduled: - log.trace("BatchAsync.schedule_next -> Batch already scheduled, nothing to do.") + log.trace( + "BatchAsync.schedule_next -> Batch already scheduled, nothing to do." + ) return self.scheduled = True if self._get_next(): # call later so that we maybe gather more returns - log.trace("BatchAsync.schedule_next delaying batch %s second(s).", self.batch_delay) + log.trace( + "BatchAsync.schedule_next delaying batch %s second(s).", + self.batch_delay, + ) await asyncio.sleep(self.batch_delay) if self.event: await self.run_next() diff --git a/salt/client/__init__.py b/salt/client/__init__.py index b1993a97edcc..b1fe63a7cfb6 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -754,7 +754,7 @@ def cmd_batch( tgt_type=tgt_type, ret=ret, kwarg=kwarg, - **kwargs + **kwargs, ) eauth = salt.cli.batch.batch_get_eauth(kwargs) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index 5bf96d294955..6723027a2dc0 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -1,4 +1,5 @@ import asyncio + import pytest import tornado From aef3b5ff77aeaa4c79a3413cbef011396e0651ff Mon Sep 17 00:00:00 2001 From: vzhestkov Date: Tue, 24 Mar 2026 10:28:53 +0100 Subject: [PATCH 11/12] Fix batch_async test after alignment with asyncio --- tests/pytests/unit/cli/test_batch_async.py | 32 ++++++++++------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/pytests/unit/cli/test_batch_async.py b/tests/pytests/unit/cli/test_batch_async.py index 6723027a2dc0..92e6f195aadb 100644 --- a/tests/pytests/unit/cli/test_batch_async.py +++ b/tests/pytests/unit/cli/test_batch_async.py @@ -127,12 +127,12 @@ async def test_batch_fire_start_event(batch): batch.minions = {"foo", "bar"} batch.opts = {"batch": "2", "timeout": 5} with patch.object(batch, "events_channel", MagicMock()), patch.object( - batch.events_channel.master_event, - "fire_event_async", - AsyncMock(return_value={}), - ) as fire_event_async_mock: + batch.event, + "fire_event", + MagicMock(return_value={}), + ) as fire_event_mock: await batch.start_batch() - assert fire_event_async_mock.call_args[0] == ( + assert fire_event_mock.call_args[0] == ( { "available_minions": {"foo", "bar"}, "down_minions": set(), @@ -161,12 +161,12 @@ async def test_batch_fire_done_event(batch): batch.done_minions = {"foo"} batch.timedout_minions = {"bar"} with patch.object(batch, "events_channel", MagicMock()), patch.object( - batch.events_channel.master_event, - "fire_event_async", - AsyncMock(return_value={}), - ) as fire_event_async_mock: + batch.event, + "fire_event", + MagicMock(return_value={}), + ) as fire_event_mock: await batch.end_batch() - assert fire_event_async_mock.call_args[0] == ( + assert fire_event_mock.call_args[0] == ( { "available_minions": {"foo", "bar"}, "done_minions": batch.done_minions, @@ -292,20 +292,18 @@ async def test_batch__event_handler_ping_return(batch): async def test_batch__event_handler_call_start_batch_when_all_pings_return(batch): batch.targeted_minions = {"foo"} - future = tornado.gen.Future() - future.set_result({}) with patch.object( batch.events_channel.local_client, "run_job_async", AsyncMock(return_value={"minions": ["foo"]}), ) as local_client_mock, patch.object( - batch, "start_batch", AsyncMock(return_value=future) + batch, "start_batch", AsyncMock(return_value=None) ) as start_batch_mock: asyncio.create_task(batch.start()) - await asyncio.sleep(1) await batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) + await asyncio.sleep(1) start_batch_mock.assert_called_once() @@ -313,20 +311,18 @@ async def test_batch__event_handler_not_call_start_batch_when_not_all_pings_retu batch, ): batch.targeted_minions = {"foo", "bar"} - future = tornado.gen.Future() - future.set_result({}) with patch.object( batch.events_channel.local_client, "run_job_async", AsyncMock(return_value={"minions": ["foo", "bar"]}), ) as local_client_mock, patch.object( - batch, "start_batch", AsyncMock(return_value=future) + batch, "start_batch", AsyncMock(return_value=None) ) as start_batch_mock: asyncio.create_task(batch.start()) - await asyncio.sleep(1) await batch._BatchAsync__event_handler( "salt/job/1234/ret/foo", {"id": "foo"}, "ping_return" ) + await asyncio.sleep(1) start_batch_mock.assert_not_called() From 60a0231fcef6c3f2df89d211ebff3155658e2a0f Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Mon, 15 Jun 2026 14:07:34 -0700 Subject: [PATCH 12/12] Fix test_gather_minions_with_batch_presence_ping KeyError and add docs --- changelog/60269.added.md | 1 + salt/cli/batch.py | 12 +++++++++++- salt/cli/batch_async.py | 2 ++ salt/master.py | 15 ++++++++++++++- tests/pytests/unit/cli/test_batch.py | 1 + 5 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 changelog/60269.added.md diff --git a/changelog/60269.added.md b/changelog/60269.added.md new file mode 100644 index 000000000000..d01e821bbc83 --- /dev/null +++ b/changelog/60269.added.md @@ -0,0 +1 @@ +Added async batch execution support via `BatchAsync` and `batch_async_required`, implementing RFC-0002. Async batch mode activates automatically when the minion count meets a configurable threshold, offloading batch orchestration to the master's event loop and exposing `salt/batch//start` and `salt/batch//done` events. diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 56f45fbfc6c1..bce74cf00b69 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -37,6 +37,8 @@ def get_bnum(opts, minions, quiet): """ Return the active number of minions to maintain + .. versionadded:: 3009.0 + :param dict opts: The salt options dictionary. @@ -46,6 +48,8 @@ def get_bnum(opts, minions, quiet): :param boolean quiet: Suppress the output to the CLI. + :rtype: int or None + Preserves the legacy return values (``None`` for invalid input, ``0`` for an empty minion list with a percentage spec, ``0`` for ``batch=0``) for backward compatibility with any @@ -78,6 +82,8 @@ def batch_get_opts( """ Return the dictionary with batch options populated + .. versionadded:: 3009.0 + :param tgt: Which minions to target for the execution. @@ -105,6 +111,7 @@ def batch_get_opts( :param dict kwargs: Extra keyword arguments. + :rtype: dict """ # We need to re-import salt.utils.args here # even though it has already been imported. @@ -149,9 +156,12 @@ def batch_get_eauth(kwargs): """ Return the dictionary with eauth information + .. versionadded:: 3009.0 + :param dict kwargs: Keyword arguments to extract eauth data from. + :rtype: dict """ eauth = {} if "eauth" in kwargs: @@ -200,7 +210,7 @@ def gather_minions(self): self.opts["tgt"], "test.ping", [], - self.opts.get("batch_presence_ping_timeout", self.opts["timeout"]), + self.opts.get("batch_presence_ping_timeout", self.opts.get("timeout")), ] selected_target_option = self.opts.get("selected_target_option", None) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 949ec196ae04..1f4504002410 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -192,6 +192,8 @@ class BatchAsync: """ Run a job on the targeted minions by using a moving window of fixed size `batch`. + .. versionadded:: 3009.0 + ``BatchAsync`` is used to execute a job on the targeted minions by keeping the number of concurrent running minions to the size of `batch` parameter. diff --git a/salt/master.py b/salt/master.py index 65b4d79a25e3..69abdeb2b18b 100644 --- a/salt/master.py +++ b/salt/master.py @@ -3826,7 +3826,20 @@ def get_token(self, clear_load): async def publish_batch(self, clear_load, minions, missing): """ - This method sends out publications to the minions in case of using batch + Send out publications to the minions when using async batch mode. + + .. versionadded:: 3009.0 + + :param dict clear_load: + The publication payload from the client. + + :param list minions: + The list of matched minion IDs. + + :param list missing: + Minion IDs that were targeted but not found. + + :rtype: dict """ batch_load = {} batch_load.update(clear_load) diff --git a/tests/pytests/unit/cli/test_batch.py b/tests/pytests/unit/cli/test_batch.py index 8a5d061bb889..bed5044abd6c 100644 --- a/tests/pytests/unit/cli/test_batch.py +++ b/tests/pytests/unit/cli/test_batch.py @@ -577,6 +577,7 @@ def test_gather_minions_with_batch_presence_ping(batch): "conf_file": {}, "tgt": "", "transport": "", + "timeout": 10, "gather_job_timeout": 20, } opts_with_pp = {