From 49dd8c4b2882e6662da47fb4cb4c1b75b3a59029 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Thu, 11 Jun 2026 17:52:52 -0700 Subject: [PATCH 1/3] Disconnect non-consuming IPC subscribers to stop publisher memory growth IPCMessagePublisher.publish() spawned an unbounded number of stream.write() coroutines per subscriber per message. If a subscriber stopped consuming, every following publish queued another _write coroutine that awaited a stream.write() that would never complete; the pending writes (and their referenced payloads) accumulated in the publisher's ioloop and inflated the master EventPublisher's RSS without bound -- the "non consuming IPC clients" leak. Wrap _write() with the new ipc_write_timeout opt (default 30s, 0 disables) via salt.ext.tornado.gen.with_timeout. On timeout, close the stream and discard it from self.streams so the publisher reclaims memory and stops spawning writers for that client. publish() now also skips streams that still have a pending write buffer, which prevents piling up more coroutines on a slow subscriber before the timeout fires. Documented on master and minion config pages and added a changelog fragment. Fixes #68114 --- changelog/68114.fixed.md | 1 + doc/ref/configuration/master.rst | 18 +++++ doc/ref/configuration/minion.rst | 18 +++++ salt/config/__init__.py | 8 +++ salt/defaults/__init__.py | 6 ++ salt/transport/ipc.py | 29 +++++++- tests/pytests/unit/transport/test_ipc.py | 87 ++++++++++++++++++++++++ 7 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 changelog/68114.fixed.md diff --git a/changelog/68114.fixed.md b/changelog/68114.fixed.md new file mode 100644 index 000000000000..d6026d055951 --- /dev/null +++ b/changelog/68114.fixed.md @@ -0,0 +1 @@ +Disconnect non-consuming IPC subscribers after `ipc_write_timeout` seconds so the publisher no longer leaks memory on stalled clients. diff --git a/doc/ref/configuration/master.rst b/doc/ref/configuration/master.rst index 74bda344bd43..d593e867bb46 100644 --- a/doc/ref/configuration/master.rst +++ b/doc/ref/configuration/master.rst @@ -1090,6 +1090,24 @@ this option. ipc_write_buffer: 10485760 +.. conf_master:: ipc_write_timeout + +``ipc_write_timeout`` +----------------------- + +Default: ``30`` + +Per-message write timeout (in seconds) for the IPC publisher used by the +master event bus. If a subscriber stops consuming messages, pending writes +to that subscriber will be held in memory by the publisher, which can cause +unbounded memory growth. When a single write exceeds ``ipc_write_timeout``, +the publisher disconnects the offending subscriber to reclaim memory. +Setting this option to ``0`` disables the timeout (legacy behavior). + +.. code-block:: yaml + + ipc_write_timeout: 30 + .. conf_master:: tcp_master_pub_port ``tcp_master_pub_port`` diff --git a/doc/ref/configuration/minion.rst b/doc/ref/configuration/minion.rst index 17298548815c..55368558ac06 100644 --- a/doc/ref/configuration/minion.rst +++ b/doc/ref/configuration/minion.rst @@ -1544,6 +1544,24 @@ this option. ipc_write_buffer: 10485760 +.. conf_minion:: ipc_write_timeout + +``ipc_write_timeout`` +----------------------- + +Default: ``30`` + +Per-message write timeout (in seconds) for the IPC publisher used by the +event bus. If a subscriber stops consuming messages, pending writes to that +subscriber will be held in memory by the publisher, which can cause +unbounded memory growth. When a single write exceeds ``ipc_write_timeout``, +the publisher disconnects the offending subscriber to reclaim memory. +Setting this option to ``0`` disables the timeout (legacy behavior). + +.. code-block:: yaml + + ipc_write_timeout: 30 + .. conf_minion:: tcp_pub_port ``tcp_pub_port`` diff --git a/salt/config/__init__.py b/salt/config/__init__.py index 86788d5384a8..d2eb3058a3a6 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -14,6 +14,7 @@ from copy import deepcopy import salt.crypt +import salt.defaults import salt.defaults.exitcodes import salt.exceptions import salt.features @@ -487,6 +488,11 @@ def _gather_buffer_space(): # IPC buffer size # Refs https://github.com/saltstack/salt/issues/34215 "ipc_write_buffer": int, + # Per-message write timeout (seconds) for IPCMessagePublisher; if a + # subscriber does not consume within this window the publisher drops + # it to prevent unbounded memory growth. ``0`` disables the timeout. + # Refs https://github.com/saltstack/salt/issues/68114 + "ipc_write_timeout": int, # various subprocess niceness levels "req_server_niceness": (type(None), int), "pub_server_niceness": (type(None), int), @@ -1180,6 +1186,7 @@ def _gather_buffer_space(): "mine_interval": 60, "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, + "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, "ipv6": None, "file_buffer_size": 262144, "tcp_pub_port": 4510, @@ -1506,6 +1513,7 @@ def _gather_buffer_space(): "enforce_mine_cache": False, "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, + "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, # various subprocess niceness levels "req_server_niceness": None, "pub_server_niceness": None, diff --git a/salt/defaults/__init__.py b/salt/defaults/__init__.py index 5ebdb694a58c..2b2cc3627763 100644 --- a/salt/defaults/__init__.py +++ b/salt/defaults/__init__.py @@ -60,3 +60,9 @@ def __repr__(self): cases are proper defaults and are also proper values to pass. """ NOT_SET = _Constant("NOT_SET") + +# Default timeout (seconds) applied to IPCMessagePublisher writes; if a +# subscriber does not consume within this window the publisher drops it to +# prevent unbounded memory growth from pending writes. ``0`` disables the +# timeout (legacy behavior). +IPC_WRITE_TIMEOUT = 30 diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 2b55cc0e7dfd..74593c3cd727 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -2,12 +2,14 @@ IPC transport classes """ +import datetime import errno import logging import socket import time import warnings +import salt.defaults import salt.ext.tornado import salt.ext.tornado.concurrent import salt.ext.tornado.gen @@ -536,8 +538,26 @@ def start(self): @salt.ext.tornado.gen.coroutine def _write(self, stream, pack): + timeout = self.opts.get("ipc_write_timeout", salt.defaults.IPC_WRITE_TIMEOUT) try: - yield stream.write(pack) + if timeout and timeout > 0: + yield salt.ext.tornado.gen.with_timeout( + datetime.timedelta(seconds=timeout), + stream.write(pack), + quiet_exceptions=(StreamClosedError,), + ) + else: + yield stream.write(pack) + except salt.ext.tornado.gen.TimeoutError: + log.warning( + "Non-consuming IPC subscriber on %s exceeded ipc_write_timeout" + " (%ss); dropping it to avoid unbounded memory growth", + self.socket_path, + timeout, + ) + if not stream.closed(): + stream.close() + self.streams.discard(stream) except StreamClosedError: log.trace("Client disconnected from IPC %s", self.socket_path) self.streams.discard(stream) @@ -556,6 +576,13 @@ def publish(self, msg): pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) for stream in self.streams: + # Skip streams that already have a pending write -- piling more + # spawn_callback(self._write, ...) coroutines on a non-consuming + # subscriber is what causes the publisher RSS to balloon. The + # timeout branch in _write() will close the stream and discard + # it from self.streams once ipc_write_timeout elapses. + if getattr(stream, "_write_buffer_size", 0): + continue self.io_loop.spawn_callback(self._write, stream, pack) def handle_connection(self, connection, address): diff --git a/tests/pytests/unit/transport/test_ipc.py b/tests/pytests/unit/transport/test_ipc.py index 12b8ac2be34e..e3f00207adc4 100644 --- a/tests/pytests/unit/transport/test_ipc.py +++ b/tests/pytests/unit/transport/test_ipc.py @@ -1,7 +1,12 @@ +import socket + import pytest from pytestshellutils.utils import ports +import salt.ext.tornado.gen +import salt.ext.tornado.ioloop import salt.ext.tornado.iostream +import salt.transport.frame import salt.transport.ipc import salt.utils.asynchronous import salt.utils.platform @@ -36,6 +41,88 @@ async def test_ipc_connect_sync_wrapped(io_loop, tmp_path): subscriber.connect() +async def test_ipc_publisher_drops_non_consuming_client_68114(io_loop, tmp_path): + """ + Regression for #68114. + + A subscriber that connects to ``IPCMessagePublisher`` but never reads + from the socket used to make ``IPCMessagePublisher._write`` block on + ``stream.write()`` forever -- ``publish()`` kept queueing more + ``spawn_callback(self._write, ...)`` coroutines per message, all of + which awaited a write that would never complete. The pending writes + (plus their referenced payloads) accumulated in the publisher's event + loop, causing the master event publisher's RSS to grow without bound + against a non-consuming IPC client. + + With the fix, ``_write`` applies a configurable ``ipc_write_timeout``. + When a write to a slow/non-consuming subscriber does not complete in + time, the stream is closed and removed from ``self.streams`` so the + publisher reclaims memory and stops spawning new writers for that + client. + """ + if salt.utils.platform.is_windows(): + pytest.skip("IPCMessagePublisher uses unix sockets only on this path") + + socket_path = str(tmp_path / "pub_68114.ipc") + opts = { + # Default in real deployments is 0 (unbounded write buffer); that is + # exactly the case where a non-consuming subscriber leaks memory in + # the publisher because tornado's IOStream never raises + # StreamBufferFullError and the pending writes accumulate. + "ipc_write_buffer": 0, + # Tight timeout so the test does not have to wait long for the drop. + "ipc_write_timeout": 1, + } + + publisher = salt.transport.ipc.IPCMessagePublisher( + opts, socket_path, io_loop=io_loop + ) + publisher.start() + try: + # Non-consuming subscriber: a raw blocking UNIX socket that never reads. + slow = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + slow.setblocking(False) + try: + slow.connect(socket_path) + except BlockingIOError: + pass + + # Let the publisher's accept handler register the new stream. + for _ in range(20): + await salt.ext.tornado.gen.sleep(0.05) + if publisher.streams: + break + assert publisher.streams, "publisher never registered the slow subscriber" + + # Publish enough oversized payloads that the unix-socket send buffer + # is guaranteed to back up; with ipc_write_buffer=0 tornado's + # IOStream will never raise StreamBufferFullError, so before the fix + # the _write coroutines for the slow client awaited stream.write() + # forever and accumulated in the event loop. + big_payload = "x" * (256 * 1024) + for _ in range(64): + publisher.publish(big_payload) + + # Wait for the publisher's write timeout to fire and drop the slow + # stream. ipc_write_timeout=1 so a few seconds is plenty. + for _ in range(60): + await salt.ext.tornado.gen.sleep(0.1) + if not publisher.streams: + break + + assert not publisher.streams, ( + "publisher did not drop a non-consuming subscriber after" + " ipc_write_timeout elapsed" + ) + + try: + slow.close() + except OSError: + pass + finally: + publisher.close() + + async def test_ipc_client_connect_after_close_no_attribute_error(io_loop, tmp_path): """ Regression for #68993. From 726290821bc5ef49be421626ceaa96e8b924ca5f Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 12 Jun 2026 17:30:17 -0700 Subject: [PATCH 2/3] Remove stale ipc_write_timeout conf_minion directive from master.rst A stale `.. conf_minion:: ipc_write_timeout` block (Default: 15, versionadded:: 3006.11) was inherited from origin/3006.x as a carryover from the reverted 4c0e5529f5d attempt. It collides with this PR's new `.. conf_minion:: ipc_write_timeout` entry in minion.rst, causing Sphinx -W to abort with a duplicate description warning and cascading failures across the docs build and 28+ dependent CI jobs. The correct master-side entry already exists at master.rst:1077 (`.. conf_master:: ipc_write_timeout`, Default: 30) matching salt.defaults.IPC_WRITE_TIMEOUT. Remove only the stale minion directive block; no other changes. --- doc/ref/configuration/master.rst | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/doc/ref/configuration/master.rst b/doc/ref/configuration/master.rst index d593e867bb46..a0e78a51f6c5 100644 --- a/doc/ref/configuration/master.rst +++ b/doc/ref/configuration/master.rst @@ -374,22 +374,6 @@ Set the default timeout for the salt command and api. .. conf_master:: loop_interval -.. conf_minion:: ipc_write_timeout - -``ipc_write_timeout`` ---------------------- - -.. versionadded:: 3006.11 - -Default: ``15`` - -How many seconds the event publisher process will wait after a client stops -responding before the client will be disconnected. - -.. code-block:: yaml - - ipc_write_timeout: 15 - ``loop_interval`` ----------------- From 8df03353a26b603609f8a4205c4a726e974dea69 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 13 Jun 2026 17:46:38 -0700 Subject: [PATCH 3/3] Bound IPC publisher pending writes per-stream instead of skipping on any buffered byte The first cut of the #68114 leak fix skipped publish() for any stream with _write_buffer_size > 0. That predicate fires on a consumer that is only momentarily behind during an event burst, so tests/pytests/unit/utils/event/ test_event.py::test_event_many_backlog (500-event burst) lost event #279 deterministically on Photon 5 Arm64 FIPS and Rocky 8 Arm64. Replace the buffer-byte skip with a per-stream counter of in-flight _write coroutines, capped by a new ipc_publisher_pending_writes opt (default 10000). Each pending write holds a payload reference; bounding the count is what actually prevents the leak. Consumers that are momentarily behind get the full burst, while a truly non-consuming subscriber stops accumulating new writes once the cap is reached -- the oldest pending write then trips ipc_write_timeout and the subscriber is dropped (existing behavior). Add a regression test that publishes a heavy burst at a stuck subscriber and asserts the per-stream pending count never exceeds the configured cap. The existing #68114 leak-drop test still passes, confirming the memory fix's intent is preserved. Refs #68114 --- changelog/68114.fixed.md | 2 +- doc/ref/configuration/master.rst | 22 +++++++ salt/config/__init__.py | 8 +++ salt/defaults/__init__.py | 7 +++ salt/transport/ipc.py | 33 ++++++++-- tests/pytests/unit/transport/test_ipc.py | 80 ++++++++++++++++++++++++ 6 files changed, 145 insertions(+), 7 deletions(-) diff --git a/changelog/68114.fixed.md b/changelog/68114.fixed.md index d6026d055951..65ccbca1d739 100644 --- a/changelog/68114.fixed.md +++ b/changelog/68114.fixed.md @@ -1 +1 @@ -Disconnect non-consuming IPC subscribers after `ipc_write_timeout` seconds so the publisher no longer leaks memory on stalled clients. +Disconnect non-consuming IPC subscribers after `ipc_write_timeout` seconds, and bound per-subscriber in-flight writes via `ipc_publisher_pending_writes`, so the publisher no longer leaks memory on stalled clients while still letting bursts through to consumers that are momentarily behind. diff --git a/doc/ref/configuration/master.rst b/doc/ref/configuration/master.rst index a0e78a51f6c5..5e1fe5cef6d3 100644 --- a/doc/ref/configuration/master.rst +++ b/doc/ref/configuration/master.rst @@ -1092,6 +1092,28 @@ Setting this option to ``0`` disables the timeout (legacy behavior). ipc_write_timeout: 30 +.. conf_master:: ipc_publisher_pending_writes + +``ipc_publisher_pending_writes`` +-------------------------------- + +Default: ``10000`` + +Maximum number of in-flight ``stream.write()`` coroutines the IPC publisher +will keep queued per subscriber. Each pending write holds a reference to the +payload being delivered, so an unbounded queue against a non-consuming +subscriber is what causes the master event publisher's memory to grow. When +a subscriber accumulates more than ``ipc_publisher_pending_writes`` pending +writes the publisher skips new messages for it until the queue drains; if +the subscriber never drains, the oldest pending write will eventually +trip ``ipc_write_timeout`` and the subscriber will be dropped. The default +is high enough to absorb legitimate event bursts to consumers that are +momentarily behind. Setting this option to ``0`` disables the bound. + +.. code-block:: yaml + + ipc_publisher_pending_writes: 10000 + .. conf_master:: tcp_master_pub_port ``tcp_master_pub_port`` diff --git a/salt/config/__init__.py b/salt/config/__init__.py index d2eb3058a3a6..1afd514ba94d 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -493,6 +493,12 @@ def _gather_buffer_space(): # it to prevent unbounded memory growth. ``0`` disables the timeout. # Refs https://github.com/saltstack/salt/issues/68114 "ipc_write_timeout": int, + # Maximum number of in-flight stream.write() coroutines per IPC + # subscriber before IPCMessagePublisher.publish() starts skipping that + # subscriber. Bounds memory growth from pending writes on backlogged + # consumers while leaving headroom for legitimate bursts. ``0`` + # disables the bound. Refs https://github.com/saltstack/salt/issues/68114 + "ipc_publisher_pending_writes": int, # various subprocess niceness levels "req_server_niceness": (type(None), int), "pub_server_niceness": (type(None), int), @@ -1187,6 +1193,7 @@ def _gather_buffer_space(): "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, + "ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES, "ipv6": None, "file_buffer_size": 262144, "tcp_pub_port": 4510, @@ -1514,6 +1521,7 @@ def _gather_buffer_space(): "ipc_mode": _DFLT_IPC_MODE, "ipc_write_buffer": _DFLT_IPC_WBUFFER, "ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT, + "ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES, # various subprocess niceness levels "req_server_niceness": None, "pub_server_niceness": None, diff --git a/salt/defaults/__init__.py b/salt/defaults/__init__.py index 2b2cc3627763..ba0038e895bf 100644 --- a/salt/defaults/__init__.py +++ b/salt/defaults/__init__.py @@ -66,3 +66,10 @@ def __repr__(self): # prevent unbounded memory growth from pending writes. ``0`` disables the # timeout (legacy behavior). IPC_WRITE_TIMEOUT = 30 + +# Maximum number of in-flight stream.write() coroutines per IPC subscriber +# before IPCMessagePublisher.publish() starts skipping that subscriber. This +# bounds memory growth from pending writes on backlogged consumers while +# leaving enough headroom for legitimate bursts. ``0`` disables the bound +# (legacy behavior). +IPC_PUBLISHER_PENDING_WRITES = 10000 diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 74593c3cd727..c23e13b42b53 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -510,6 +510,10 @@ def __init__(self, opts, socket_path, io_loop=None): self.io_loop = io_loop or IOLoop.current() self._closing = False self.streams = set() + # Per-stream in-flight _write coroutine counter, used by publish() + # to bound queued writes on backlogged subscribers without dropping + # legitimate burst traffic on a momentarily-slow consumer. + self._pending_writes = {} def start(self): """ @@ -566,6 +570,12 @@ def _write(self, stream, pack): if not stream.closed(): stream.close() self.streams.discard(stream) + finally: + pending = self._pending_writes.get(stream, 0) - 1 + if pending <= 0: + self._pending_writes.pop(stream, None) + else: + self._pending_writes[stream] = pending def publish(self, msg): """ @@ -575,14 +585,23 @@ def publish(self, msg): return pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) + hwm = self.opts.get( + "ipc_publisher_pending_writes", + salt.defaults.IPC_PUBLISHER_PENDING_WRITES, + ) for stream in self.streams: - # Skip streams that already have a pending write -- piling more - # spawn_callback(self._write, ...) coroutines on a non-consuming - # subscriber is what causes the publisher RSS to balloon. The - # timeout branch in _write() will close the stream and discard - # it from self.streams once ipc_write_timeout elapses. - if getattr(stream, "_write_buffer_size", 0): + # Bound the number of in-flight _write coroutines per subscriber. + # Each spawn_callback(self._write, ...) holds a reference to + # ``pack`` until the write drains; an unbounded queue on a + # non-consuming subscriber is what causes the publisher RSS to + # balloon. A subscriber that exceeds ``hwm`` pending writes is + # treated as backlogged -- we skip new writes for it, and the + # timeout branch in _write() will eventually close the stream + # and discard it once ipc_write_timeout elapses on the oldest + # pending write. A hwm of 0 disables the bound (legacy behavior). + if hwm and self._pending_writes.get(stream, 0) >= hwm: continue + self._pending_writes[stream] = self._pending_writes.get(stream, 0) + 1 self.io_loop.spawn_callback(self._write, stream, pack) def handle_connection(self, connection, address): @@ -601,6 +620,7 @@ def handle_connection(self, connection, address): def discard_after_closed(): self.streams.discard(stream) + self._pending_writes.pop(stream, None) stream.set_close_callback(discard_after_closed) except Exception as exc: # pylint: disable=broad-except @@ -618,6 +638,7 @@ def close(self): for stream in self.streams: stream.close() self.streams.clear() + self._pending_writes.clear() if hasattr(self.sock, "close"): self.sock.close() diff --git a/tests/pytests/unit/transport/test_ipc.py b/tests/pytests/unit/transport/test_ipc.py index e3f00207adc4..ae70bee8640e 100644 --- a/tests/pytests/unit/transport/test_ipc.py +++ b/tests/pytests/unit/transport/test_ipc.py @@ -123,6 +123,86 @@ async def test_ipc_publisher_drops_non_consuming_client_68114(io_loop, tmp_path) publisher.close() +async def test_ipc_publisher_pending_writes_bounded_for_stuck_subscriber( + io_loop, tmp_path +): + """ + Regression for #68114 follow-up. + + ``IPCMessagePublisher.publish()`` must bound the number of in-flight + ``_write`` coroutines per subscriber so that a non-consuming subscriber + cannot keep growing the publisher's memory footprint. The first cut of + the fix skipped any stream with ``_write_buffer_size > 0`` which was + too aggressive -- a consuming subscriber that briefly fell behind on a + burst (e.g. ``test_event_many_backlog``) would have legitimate events + silently dropped. The fix replaces that predicate with a per-stream + pending-write counter capped by ``ipc_publisher_pending_writes`` so: + + * legitimate bursts under the cap go through; and + * a truly stuck subscriber cannot accumulate more than the cap of + pending writes (each of which holds a payload reference). + """ + if salt.utils.platform.is_windows(): + pytest.skip("IPCMessagePublisher uses unix sockets only on this path") + + socket_path = str(tmp_path / "pub_pending.ipc") + opts = { + "ipc_write_buffer": 0, + # Long enough that the timeout branch does not fire during the + # bounded portion of the test -- we want to exercise the HWM. + "ipc_write_timeout": 30, + "ipc_publisher_pending_writes": 8, + } + + publisher = salt.transport.ipc.IPCMessagePublisher( + opts, socket_path, io_loop=io_loop + ) + publisher.start() + try: + slow = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + slow.setblocking(False) + try: + slow.connect(socket_path) + except BlockingIOError: + pass + + for _ in range(20): + await salt.ext.tornado.gen.sleep(0.05) + if publisher.streams: + break + assert publisher.streams, "publisher never registered the slow subscriber" + + stream = next(iter(publisher.streams)) + + # Publish a heavy burst of large payloads against the non-consuming + # subscriber. Each payload is big enough that tornado's IOStream + # cannot drain it to the (non-reading) UNIX socket immediately, so + # the per-stream pending-write counter accumulates. + big_payload = "x" * (256 * 1024) + for _ in range(64): + publisher.publish(big_payload) + # Yield so the spawned _write coroutines actually start. + await salt.ext.tornado.gen.sleep(0) + + # The per-stream pending count must never exceed the configured cap. + assert ( + publisher._pending_writes.get(stream, 0) + <= opts["ipc_publisher_pending_writes"] + ), ( + "publisher accumulated more pending writes than" + f" ipc_publisher_pending_writes={opts['ipc_publisher_pending_writes']!r}" + f" for a non-consuming subscriber:" + f" {publisher._pending_writes.get(stream, 0)}" + ) + + try: + slow.close() + except OSError: + pass + finally: + publisher.close() + + async def test_ipc_client_connect_after_close_no_attribute_error(io_loop, tmp_path): """ Regression for #68993.