Skip to content
1 change: 1 addition & 0 deletions changelog/60269.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added async batch execution support via `BatchAsync` and `batch_async_required`, implementing RFC-0002. Async batch mode activates automatically when the minion count meets a configurable threshold, offloading batch orchestration to the master's event loop and exposing `salt/batch/<jid>/start` and `salt/batch/<jid>/done` events.
2 changes: 2 additions & 0 deletions salt/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
"print_event",
"raw",
"yield_pub_data",
"batch",
"batch_delay",
]
)

Expand Down
179 changes: 148 additions & 31 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,148 @@
log = logging.getLogger(__name__)


def get_bnum(opts, minions, quiet):
"""
Return the active number of minions to maintain

.. versionadded:: 3009.0

:param dict opts:
The salt options dictionary.

:param minions:
The list of the minions to perform the calculation.

:param boolean quiet:
Suppress the output to the CLI.

:rtype: int or None

Preserves the legacy return values (``None`` for invalid
input, ``0`` for an empty minion list with a percentage spec,
``0`` for ``batch=0``) for backward compatibility with any
callers that rely on them. The shared state machine uses the
hardened :func:`salt.utils.batch_state.get_batch_size` which
always returns at least 1.
"""

def partition(x):
return float(x) / 100.0 * len(minions)

try:
if isinstance(opts["batch"], str) and "%" in opts["batch"]:
res = partition(float(opts["batch"].strip("%")))
if res < 1:
return int(math.ceil(res))
return int(res)
return int(opts["batch"])
except ValueError:
if not quiet:
salt.utils.stringutils.print_cli(
"Invalid batch data sent: {}\nData must be in the "
"form of %10, 10% or 3".format(opts["batch"])
)


def batch_get_opts(
tgt, fun, batch, parent_opts, arg=(), tgt_type="glob", ret="", kwarg=None, **kwargs
):
"""
Return the dictionary with batch options populated

.. versionadded:: 3009.0

:param tgt:
Which minions to target for the execution.

:param str fun:
The function to run.

:param batch:
The batch size.

:param dict parent_opts:
The salt options dictionary.

:param list arg:
The arguments to put to the resulting ``arg`` key of resulting dictionary.

:param str tgt_type:
Default ``glob``. Target type to use with ``tgt``.

:param ret:
``ret`` parameter to put to the resulting dictionary.

:param dict kwarg:
Extra arguments to put to the resulting ``arg`` key of resulting dictionary.

:param dict kwargs:
Extra keyword arguments.

:rtype: dict
"""
# We need to re-import salt.utils.args here
# even though it has already been imported.
# when cmd_batch is called via the NetAPI
# the module is unavailable.
import salt.utils.args

arg = salt.utils.args.condition_input(arg, kwarg)
opts = {
"tgt": tgt,
"fun": fun,
"arg": arg,
"tgt_type": tgt_type,
"ret": ret,
"batch": batch,
"failhard": kwargs.get("failhard", parent_opts.get("failhard", False)),
"raw": kwargs.get("raw", False),
}

if "timeout" in kwargs:
opts["timeout"] = kwargs["timeout"]
if "gather_job_timeout" in kwargs:
opts["gather_job_timeout"] = kwargs["gather_job_timeout"]
if "batch_wait" in kwargs:
opts["batch_wait"] = int(kwargs["batch_wait"])

for key, val in parent_opts.items():
if key not in opts:
opts[key] = val

opts["batch_presence_ping_timeout"] = kwargs.get(
"batch_presence_ping_timeout", opts["timeout"]
)
opts["batch_presence_ping_gather_job_timeout"] = kwargs.get(
"batch_presence_ping_gather_job_timeout", opts["gather_job_timeout"]
)

return opts


def batch_get_eauth(kwargs):
"""
Return the dictionary with eauth information

.. versionadded:: 3009.0

:param dict kwargs:
Keyword arguments to extract eauth data from.

:rtype: dict
"""
eauth = {}
if "eauth" in kwargs:
eauth["eauth"] = kwargs.pop("eauth")
if "username" in kwargs:
eauth["username"] = kwargs.pop("username")
if "password" in kwargs:
eauth["password"] = kwargs.pop("password")
if "token" in kwargs:
eauth["token"] = kwargs.pop("token")
return eauth


class Batch:
"""
Manage the execution of batch runs.
Expand All @@ -55,6 +197,7 @@ def __init__(self, opts, eauth=None, quiet=False, _parser=None):
self.pub_kwargs = eauth if eauth else {}
self.quiet = quiet
self.options = _parser
self.minions = set()
# Passing listen True to local client will prevent it from purging
# cached events while iterating over the batches.
self.local = salt.client.get_local_client(opts["conf_file"], listen=True)
Expand All @@ -67,7 +210,7 @@ def gather_minions(self):
self.opts["tgt"],
"test.ping",
[],
self.opts["timeout"],
self.opts.get("batch_presence_ping_timeout", self.opts.get("timeout")),
]

selected_target_option = self.opts.get("selected_target_option", None)
Expand All @@ -79,7 +222,10 @@ def gather_minions(self):
self.pub_kwargs["yield_pub_data"] = True
ping_gen = self.local.cmd_iter(
*args,
gather_job_timeout=self.opts["gather_job_timeout"],
gather_job_timeout=self.opts.get(
"batch_presence_ping_gather_job_timeout",
self.opts["gather_job_timeout"],
),
**self.pub_kwargs,
)

Expand Down Expand Up @@ -110,35 +256,6 @@ def gather_minions(self):

return (list(fret), ping_gen, nret.difference(fret))

def get_bnum(self):
"""
Return the active number of minions to maintain.

Preserves the legacy return values (``None`` for invalid
input, ``0`` for an empty minion list with a percentage spec,
``0`` for ``batch=0``) for backward compatibility with any
callers that rely on them. The shared state machine uses the
hardened :func:`salt.utils.batch_state.get_batch_size` which
always returns at least 1.
"""

def partition(x):
return float(x) / 100.0 * len(self.minions)

try:
if isinstance(self.opts["batch"], str) and "%" in self.opts["batch"]:
res = partition(float(self.opts["batch"].strip("%")))
if res < 1:
return int(math.ceil(res))
return int(res)
return int(self.opts["batch"])
except ValueError:
if not self.quiet:
salt.utils.stringutils.print_cli(
"Invalid batch data sent: {}\nData must be in the "
"form of %10, 10% or 3".format(self.opts["batch"])
)

def run(self):
"""
Execute the batch run.
Expand Down
Loading
Loading