diff --git a/changelog/68114.fixed.md b/changelog/68114.fixed.md new file mode 100644 index 000000000000..65ccbca1d739 --- /dev/null +++ b/changelog/68114.fixed.md @@ -0,0 +1 @@ +Disconnect non-consuming IPC subscribers after `ipc_write_timeout` seconds, and bound per-subscriber in-flight writes via `ipc_publisher_pending_writes`, so the publisher no longer leaks memory on stalled clients while still letting bursts through to consumers that are momentarily behind. diff --git a/doc/ref/configuration/master.rst b/doc/ref/configuration/master.rst index 74bda344bd43..5e1fe5cef6d3 100644 --- a/doc/ref/configuration/master.rst +++ b/doc/ref/configuration/master.rst @@ -374,22 +374,6 @@ Set the default timeout for the salt command and api. .. conf_master:: loop_interval -.. conf_minion:: ipc_write_timeout - -``ipc_write_timeout`` ---------------------- - -.. versionadded:: 3006.11 - -Default: ``15`` - -How many seconds the event publisher process will wait after a client stops -responding before the client will be disconnected. - -.. code-block:: yaml - - ipc_write_timeout: 15 - ``loop_interval`` ----------------- @@ -1090,6 +1074,46 @@ this option. ipc_write_buffer: 10485760 +.. conf_master:: ipc_write_timeout + +``ipc_write_timeout`` +----------------------- + +Default: ``30`` + +Per-message write timeout (in seconds) for the IPC publisher used by the +master event bus. If a subscriber stops consuming messages, pending writes +to that subscriber will be held in memory by the publisher, which can cause +unbounded memory growth. When a single write exceeds ``ipc_write_timeout``, +the publisher disconnects the offending subscriber to reclaim memory. +Setting this option to ``0`` disables the timeout (legacy behavior). + +.. code-block:: yaml + + ipc_write_timeout: 30 + +.. conf_master:: ipc_publisher_pending_writes + +``ipc_publisher_pending_writes`` +-------------------------------- + +Default: ``10000`` + +Maximum number of in-flight ``stream.write()`` coroutines the IPC publisher +will keep queued per subscriber. Each pending write holds a reference to the +payload being delivered, so an unbounded queue against a non-consuming +subscriber is what causes the master event publisher's memory to grow. When +a subscriber accumulates more than ``ipc_publisher_pending_writes`` pending +writes the publisher skips new messages for it until the queue drains; if +the subscriber never drains, the oldest pending write will eventually +trip ``ipc_write_timeout`` and the subscriber will be dropped. The default +is high enough to absorb legitimate event bursts to consumers that are +momentarily behind. Setting this option to ``0`` disables the bound. + +.. code-block:: yaml + + ipc_publisher_pending_writes: 10000 + .. conf_master:: tcp_master_pub_port ``tcp_master_pub_port`` diff --git a/doc/ref/configuration/minion.rst b/doc/ref/configuration/minion.rst index 17298548815c..55368558ac06 100644 --- a/doc/ref/configuration/minion.rst +++ b/doc/ref/configuration/minion.rst @@ -1544,6 +1544,24 @@ this option. ipc_write_buffer: 10485760 +.. conf_minion:: ipc_write_timeout + +``ipc_write_timeout`` +----------------------- + +Default: ``30`` + +Per-message write timeout (in seconds) for the IPC publisher used by the +event bus. If a subscriber stops consuming messages, pending writes to that +subscriber will be held in memory by the publisher, which can cause +unbounded memory growth. When a single write exceeds ``ipc_write_timeout``, +the publisher disconnects the offending subscriber to reclaim memory. +Setting this option to ``0`` disables the timeout (legacy behavior). + +.. code-block:: yaml + + ipc_write_timeout: 30 + .. conf_minion:: tcp_pub_port ``tcp_pub_port`` diff --git a/salt/config/__init__.py b/salt/config/__init__.py index 86788d5384a8..1afd514ba94d 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -14,6 +14,7 @@ from copy import deepcopy import salt.crypt +import salt.defaults import salt.defaults.exitcodes import salt.exceptions import salt.features @@ -487,6 +488,17 @@ def _gather_buffer_space(): # IPC buffer size # Refs https://github.com/saltstack/salt/issues/34215 "ipc_write_buffer": int, + # Per-message write timeout (seconds) for IPCMessagePublisher; if a + # subscriber does not consume within this window the publisher drops + # it to prevent unbounded memory growth. ``0`` disables the timeout. + # Refs https://github.com/saltstack/salt/issues/68114 + "ipc_write_timeout": int, + # Maximum number of in-flight stream.write() coroutines per IPC + # subscriber before IPCMessagePublisher.publish() starts skipping that + # subscriber. Bounds memory growth from pending writes on backlogged + # consumers while leaving headroom for legitimate bursts. ``0`` + # disables the bound. Refs https://github.com/saltstack/salt/issues/68114 + "ipc_publisher_pending_writes": int, # various subprocess niceness levels "req_server_niceness": (type(None), int), "pub_server_niceness": (type(None), int), @@ -1180,6 +1192,8 @@ def _gather_buffer_space(): "mine_interval": 60, "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, + "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, + "ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES, "ipv6": None, "file_buffer_size": 262144, "tcp_pub_port": 4510, @@ -1506,6 +1520,8 @@ def _gather_buffer_space(): "enforce_mine_cache": False, "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, + "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, + "ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES, # various subprocess niceness levels "req_server_niceness": None, "pub_server_niceness": None, diff --git a/salt/defaults/__init__.py b/salt/defaults/__init__.py index 5ebdb694a58c..ba0038e895bf 100644 --- a/salt/defaults/__init__.py +++ b/salt/defaults/__init__.py @@ -60,3 +60,16 @@ def __repr__(self): cases are proper defaults and are also proper values to pass. """ NOT_SET = _Constant("NOT_SET") + +# Default timeout (seconds) applied to IPCMessagePublisher writes; if a +# subscriber does not consume within this window the publisher drops it to +# prevent unbounded memory growth from pending writes. ``0`` disables the +# timeout (legacy behavior). +IPC_WRITE_TIMEOUT = 30 + +# Maximum number of in-flight stream.write() coroutines per IPC subscriber +# before IPCMessagePublisher.publish() starts skipping that subscriber. This +# bounds memory growth from pending writes on backlogged consumers while +# leaving enough headroom for legitimate bursts. ``0`` disables the bound +# (legacy behavior). +IPC_PUBLISHER_PENDING_WRITES = 10000 diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 2b55cc0e7dfd..c23e13b42b53 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -2,12 +2,14 @@ IPC transport classes """ +import datetime import errno import logging import socket import time import warnings +import salt.defaults import salt.ext.tornado import salt.ext.tornado.concurrent import salt.ext.tornado.gen @@ -508,6 +510,10 @@ def __init__(self, opts, socket_path, io_loop=None): self.io_loop = io_loop or IOLoop.current() self._closing = False self.streams = set() + # Per-stream in-flight _write coroutine counter, used by publish() + # to bound queued writes on backlogged subscribers without dropping + # legitimate burst traffic on a momentarily-slow consumer. + self._pending_writes = {} def start(self): """ @@ -536,8 +542,26 @@ def start(self): @salt.ext.tornado.gen.coroutine def _write(self, stream, pack): + timeout = self.opts.get("ipc_write_timeout", salt.defaults.IPC_WRITE_TIMEOUT) try: - yield stream.write(pack) + if timeout and timeout > 0: + yield salt.ext.tornado.gen.with_timeout( + datetime.timedelta(seconds=timeout), + stream.write(pack), + quiet_exceptions=(StreamClosedError,), + ) + else: + yield stream.write(pack) + except salt.ext.tornado.gen.TimeoutError: + log.warning( + "Non-consuming IPC subscriber on %s exceeded ipc_write_timeout" + " (%ss); dropping it to avoid unbounded memory growth", + self.socket_path, + timeout, + ) + if not stream.closed(): + stream.close() + self.streams.discard(stream) except StreamClosedError: log.trace("Client disconnected from IPC %s", self.socket_path) self.streams.discard(stream) @@ -546,6 +570,12 @@ def _write(self, stream, pack): if not stream.closed(): stream.close() self.streams.discard(stream) + finally: + pending = self._pending_writes.get(stream, 0) - 1 + if pending <= 0: + self._pending_writes.pop(stream, None) + else: + self._pending_writes[stream] = pending def publish(self, msg): """ @@ -555,7 +585,23 @@ def publish(self, msg): return pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) + hwm = self.opts.get( + "ipc_publisher_pending_writes", + salt.defaults.IPC_PUBLISHER_PENDING_WRITES, + ) for stream in self.streams: + # Bound the number of in-flight _write coroutines per subscriber. + # Each spawn_callback(self._write, ...) holds a reference to + # ``pack`` until the write drains; an unbounded queue on a + # non-consuming subscriber is what causes the publisher RSS to + # balloon. A subscriber that exceeds ``hwm`` pending writes is + # treated as backlogged -- we skip new writes for it, and the + # timeout branch in _write() will eventually close the stream + # and discard it once ipc_write_timeout elapses on the oldest + # pending write. A hwm of 0 disables the bound (legacy behavior). + if hwm and self._pending_writes.get(stream, 0) >= hwm: + continue + self._pending_writes[stream] = self._pending_writes.get(stream, 0) + 1 self.io_loop.spawn_callback(self._write, stream, pack) def handle_connection(self, connection, address): @@ -574,6 +620,7 @@ def handle_connection(self, connection, address): def discard_after_closed(): self.streams.discard(stream) + self._pending_writes.pop(stream, None) stream.set_close_callback(discard_after_closed) except Exception as exc: # pylint: disable=broad-except @@ -591,6 +638,7 @@ def close(self): for stream in self.streams: stream.close() self.streams.clear() + self._pending_writes.clear() if hasattr(self.sock, "close"): self.sock.close() diff --git a/tests/pytests/unit/transport/test_ipc.py b/tests/pytests/unit/transport/test_ipc.py index 12b8ac2be34e..ae70bee8640e 100644 --- a/tests/pytests/unit/transport/test_ipc.py +++ b/tests/pytests/unit/transport/test_ipc.py @@ -1,7 +1,12 @@ +import socket + import pytest from pytestshellutils.utils import ports +import salt.ext.tornado.gen +import salt.ext.tornado.ioloop import salt.ext.tornado.iostream +import salt.transport.frame import salt.transport.ipc import salt.utils.asynchronous import salt.utils.platform @@ -36,6 +41,168 @@ async def test_ipc_connect_sync_wrapped(io_loop, tmp_path): subscriber.connect() +async def test_ipc_publisher_drops_non_consuming_client_68114(io_loop, tmp_path): + """ + Regression for #68114. + + A subscriber that connects to ``IPCMessagePublisher`` but never reads + from the socket used to make ``IPCMessagePublisher._write`` block on + ``stream.write()`` forever -- ``publish()`` kept queueing more + ``spawn_callback(self._write, ...)`` coroutines per message, all of + which awaited a write that would never complete. The pending writes + (plus their referenced payloads) accumulated in the publisher's event + loop, causing the master event publisher's RSS to grow without bound + against a non-consuming IPC client. + + With the fix, ``_write`` applies a configurable ``ipc_write_timeout``. + When a write to a slow/non-consuming subscriber does not complete in + time, the stream is closed and removed from ``self.streams`` so the + publisher reclaims memory and stops spawning new writers for that + client. + """ + if salt.utils.platform.is_windows(): + pytest.skip("IPCMessagePublisher uses unix sockets only on this path") + + socket_path = str(tmp_path / "pub_68114.ipc") + opts = { + # Default in real deployments is 0 (unbounded write buffer); that is + # exactly the case where a non-consuming subscriber leaks memory in + # the publisher because tornado's IOStream never raises + # StreamBufferFullError and the pending writes accumulate. + "ipc_write_buffer": 0, + # Tight timeout so the test does not have to wait long for the drop. + "ipc_write_timeout": 1, + } + + publisher = salt.transport.ipc.IPCMessagePublisher( + opts, socket_path, io_loop=io_loop + ) + publisher.start() + try: + # Non-consuming subscriber: a raw blocking UNIX socket that never reads. + slow = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + slow.setblocking(False) + try: + slow.connect(socket_path) + except BlockingIOError: + pass + + # Let the publisher's accept handler register the new stream. + for _ in range(20): + await salt.ext.tornado.gen.sleep(0.05) + if publisher.streams: + break + assert publisher.streams, "publisher never registered the slow subscriber" + + # Publish enough oversized payloads that the unix-socket send buffer + # is guaranteed to back up; with ipc_write_buffer=0 tornado's + # IOStream will never raise StreamBufferFullError, so before the fix + # the _write coroutines for the slow client awaited stream.write() + # forever and accumulated in the event loop. + big_payload = "x" * (256 * 1024) + for _ in range(64): + publisher.publish(big_payload) + + # Wait for the publisher's write timeout to fire and drop the slow + # stream. ipc_write_timeout=1 so a few seconds is plenty. + for _ in range(60): + await salt.ext.tornado.gen.sleep(0.1) + if not publisher.streams: + break + + assert not publisher.streams, ( + "publisher did not drop a non-consuming subscriber after" + " ipc_write_timeout elapsed" + ) + + try: + slow.close() + except OSError: + pass + finally: + publisher.close() + + +async def test_ipc_publisher_pending_writes_bounded_for_stuck_subscriber( + io_loop, tmp_path +): + """ + Regression for #68114 follow-up. + + ``IPCMessagePublisher.publish()`` must bound the number of in-flight + ``_write`` coroutines per subscriber so that a non-consuming subscriber + cannot keep growing the publisher's memory footprint. The first cut of + the fix skipped any stream with ``_write_buffer_size > 0`` which was + too aggressive -- a consuming subscriber that briefly fell behind on a + burst (e.g. ``test_event_many_backlog``) would have legitimate events + silently dropped. The fix replaces that predicate with a per-stream + pending-write counter capped by ``ipc_publisher_pending_writes`` so: + + * legitimate bursts under the cap go through; and + * a truly stuck subscriber cannot accumulate more than the cap of + pending writes (each of which holds a payload reference). + """ + if salt.utils.platform.is_windows(): + pytest.skip("IPCMessagePublisher uses unix sockets only on this path") + + socket_path = str(tmp_path / "pub_pending.ipc") + opts = { + "ipc_write_buffer": 0, + # Long enough that the timeout branch does not fire during the + # bounded portion of the test -- we want to exercise the HWM. + "ipc_write_timeout": 30, + "ipc_publisher_pending_writes": 8, + } + + publisher = salt.transport.ipc.IPCMessagePublisher( + opts, socket_path, io_loop=io_loop + ) + publisher.start() + try: + slow = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + slow.setblocking(False) + try: + slow.connect(socket_path) + except BlockingIOError: + pass + + for _ in range(20): + await salt.ext.tornado.gen.sleep(0.05) + if publisher.streams: + break + assert publisher.streams, "publisher never registered the slow subscriber" + + stream = next(iter(publisher.streams)) + + # Publish a heavy burst of large payloads against the non-consuming + # subscriber. Each payload is big enough that tornado's IOStream + # cannot drain it to the (non-reading) UNIX socket immediately, so + # the per-stream pending-write counter accumulates. + big_payload = "x" * (256 * 1024) + for _ in range(64): + publisher.publish(big_payload) + # Yield so the spawned _write coroutines actually start. + await salt.ext.tornado.gen.sleep(0) + + # The per-stream pending count must never exceed the configured cap. + assert ( + publisher._pending_writes.get(stream, 0) + <= opts["ipc_publisher_pending_writes"] + ), ( + "publisher accumulated more pending writes than" + f" ipc_publisher_pending_writes={opts['ipc_publisher_pending_writes']!r}" + f" for a non-consuming subscriber:" + f" {publisher._pending_writes.get(stream, 0)}" + ) + + try: + slow.close() + except OSError: + pass + finally: + publisher.close() + + async def test_ipc_client_connect_after_close_no_attribute_error(io_loop, tmp_path): """ Regression for #68993.