Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion salt/engines/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import salt.utils.reactor


def start(refresh_interval=None, worker_threads=None, worker_hwm=None):
def start(
refresh_interval=None,
worker_threads=None,
worker_hwm=None,
cleanup_interval=None,
):
if refresh_interval is not None:
__opts__["reactor_refresh_interval"] = refresh_interval
if worker_threads is not None:
__opts__["reactor_worker_threads"] = worker_threads
if worker_hwm is not None:
__opts__["reactor_worker_hwm"] = worker_hwm
if cleanup_interval is not None:
__opts__["reactor_cleanup_interval"] = cleanup_interval

salt.utils.reactor.Reactor(__opts__).run()
28 changes: 28 additions & 0 deletions salt/runners/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,31 @@ def set_leader(value=True):

res = sevent.get_event(wait=30, tag="salt/reactors/manage/leader/value")
return res["result"]


def cleanup():
"""
Request cached reactor clients to be cleaned up.

CLI Example:

.. code-block:: bash

salt-run reactor.cleanup
"""
if not _reactor_system_available():
raise CommandExecutionError("Reactor system is not running.")

with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
opts=__opts__,
listen=True,
) as sevent:

master_key = salt.utils.master.get_master_key("root", __opts__)

__jid_event__.fire_event({"key": master_key}, "salt/reactors/manage/cleanup")

res = sevent.get_event(wait=30, tag="salt/reactors/manage/cleanup-complete")
return res.get("result")
37 changes: 37 additions & 0 deletions salt/utils/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import glob
import logging
import os
import time

import salt.client
import salt.defaults.exitcodes
Expand Down Expand Up @@ -273,6 +274,12 @@ def run(self):
{"reactors": self.list_all()},
"salt/reactors/manage/list-results",
)
elif data["tag"].endswith("salt/reactors/manage/cleanup"):
res = self.wrap.cleanup()
event.fire_event(
{"result": res},
"salt/reactors/manage/cleanup-complete",
)
else:
# do not handle any reactions if not leader in cluster
if not self.is_leader:
Expand All @@ -287,6 +294,7 @@ def run(self):
self.call_reactions(chunks)
except SystemExit:
log.warning("Exit ignored by reactor")
self.wrap.maybe_cleanup()


class ReactWrap:
Expand Down Expand Up @@ -316,6 +324,35 @@ def __init__(self, opts):
self.opts["reactor_worker_threads"], # number of workers for runner/wheel
queue_size=self.opts["reactor_worker_hwm"], # queue size for those workers
)
self.cleanup_interval = self.opts.get(
"reactor_cleanup_interval", self.opts["reactor_refresh_interval"]
)
self._next_cleanup = None
if self.cleanup_interval:
self._next_cleanup = time.monotonic() + self.cleanup_interval

def maybe_cleanup(self):
"""
Periodically release cached clients to avoid retaining stale references.
"""
if not self.cleanup_interval:
return None
now = time.monotonic()
if self._next_cleanup is None or now < self._next_cleanup:
return None
self._next_cleanup = now + self.cleanup_interval
return self.cleanup()

def cleanup(self):
"""
Force cache expiry checks and return cleanup details.
"""
removed_clients = []
if isinstance(self.client_cache, salt.utils.cache.CacheDict):
for client_type in list(self.client_cache.keys()):
if client_type not in self.client_cache:
removed_clients.append(client_type)
return {"removed_clients": removed_clients}

def populate_client_cache(self, low):
"""
Expand Down
14 changes: 14 additions & 0 deletions tests/pytests/unit/utils/test_reactor2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import salt.loader
import salt.template
import salt.utils.cache
import salt.utils.data
import salt.utils.files
import salt.utils.reactor as reactor
Expand Down Expand Up @@ -583,3 +584,16 @@ def test_client_cache_missing_key(file_client, react_wrap):
file_client_key = key

assert file_client_key == f"{file_client}"


def test_react_wrap_cleanup_expires_cache(monkeypatch, react_wrap):
client_cache = salt.utils.cache.CacheDict(1)
client_cache["runner"] = Mock()
react_wrap.client_cache = client_cache

now = 1000.0
monkeypatch.setattr(salt.utils.cache.time, "time", lambda: now + 5)

result = react_wrap.cleanup()

assert result["removed_clients"] == ["runner"]