Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/68114.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Disconnect non-consuming IPC subscribers after `ipc_write_timeout` seconds, and bound per-subscriber in-flight writes via `ipc_publisher_pending_writes`, so the publisher no longer leaks memory on stalled clients while still letting bursts through to consumers that are momentarily behind.
56 changes: 40 additions & 16 deletions doc/ref/configuration/master.rst
Original file line number Diff line number Diff line change
Expand Up @@ -374,22 +374,6 @@ Set the default timeout for the salt command and api.

.. conf_master:: loop_interval

.. conf_minion:: ipc_write_timeout

``ipc_write_timeout``
---------------------

.. versionadded:: 3006.11

Default: ``15``

How many seconds the event publisher process will wait after a client stops
responding before the client will be disconnected.

.. code-block:: yaml

ipc_write_timeout: 15

``loop_interval``
-----------------

Expand Down Expand Up @@ -1090,6 +1074,46 @@ this option.

ipc_write_buffer: 10485760

.. conf_master:: ipc_write_timeout

``ipc_write_timeout``
-----------------------

Default: ``30``

Per-message write timeout (in seconds) for the IPC publisher used by the
master event bus. If a subscriber stops consuming messages, pending writes
to that subscriber will be held in memory by the publisher, which can cause
unbounded memory growth. When a single write exceeds ``ipc_write_timeout``,
the publisher disconnects the offending subscriber to reclaim memory.
Setting this option to ``0`` disables the timeout (legacy behavior).

.. code-block:: yaml

ipc_write_timeout: 30

.. conf_master:: ipc_publisher_pending_writes

``ipc_publisher_pending_writes``
--------------------------------

Default: ``10000``

Maximum number of in-flight ``stream.write()`` coroutines the IPC publisher
will keep queued per subscriber. Each pending write holds a reference to the
payload being delivered, so an unbounded queue against a non-consuming
subscriber is what causes the master event publisher's memory to grow. When
a subscriber accumulates more than ``ipc_publisher_pending_writes`` pending
writes the publisher skips new messages for it until the queue drains; if
the subscriber never drains, the oldest pending write will eventually
trip ``ipc_write_timeout`` and the subscriber will be dropped. The default
is high enough to absorb legitimate event bursts to consumers that are
momentarily behind. Setting this option to ``0`` disables the bound.

.. code-block:: yaml

ipc_publisher_pending_writes: 10000

.. conf_master:: tcp_master_pub_port

``tcp_master_pub_port``
Expand Down
18 changes: 18 additions & 0 deletions doc/ref/configuration/minion.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,24 @@ this option.

ipc_write_buffer: 10485760

.. conf_minion:: ipc_write_timeout

``ipc_write_timeout``
-----------------------

Default: ``30``

Per-message write timeout (in seconds) for the IPC publisher used by the
event bus. If a subscriber stops consuming messages, pending writes to that
subscriber will be held in memory by the publisher, which can cause
unbounded memory growth. When a single write exceeds ``ipc_write_timeout``,
the publisher disconnects the offending subscriber to reclaim memory.
Setting this option to ``0`` disables the timeout (legacy behavior).

.. code-block:: yaml

ipc_write_timeout: 30

.. conf_minion:: tcp_pub_port

``tcp_pub_port``
Expand Down
16 changes: 16 additions & 0 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from copy import deepcopy

import salt.crypt
import salt.defaults
import salt.defaults.exitcodes
import salt.exceptions
import salt.features
Expand Down Expand Up @@ -487,6 +488,17 @@ def _gather_buffer_space():
# IPC buffer size
# Refs https://github.com/saltstack/salt/issues/34215
"ipc_write_buffer": int,
# Per-message write timeout (seconds) for IPCMessagePublisher; if a
# subscriber does not consume within this window the publisher drops
# it to prevent unbounded memory growth. ``0`` disables the timeout.
# Refs https://github.com/saltstack/salt/issues/68114
"ipc_write_timeout": int,
# Maximum number of in-flight stream.write() coroutines per IPC
# subscriber before IPCMessagePublisher.publish() starts skipping that
# subscriber. Bounds memory growth from pending writes on backlogged
# consumers while leaving headroom for legitimate bursts. ``0``
# disables the bound. Refs https://github.com/saltstack/salt/issues/68114
"ipc_publisher_pending_writes": int,
# various subprocess niceness levels
"req_server_niceness": (type(None), int),
"pub_server_niceness": (type(None), int),
Expand Down Expand Up @@ -1180,6 +1192,8 @@ def _gather_buffer_space():
"mine_interval": 60,
"ipc_mode": _DFLT_IPC_MODE,
"ipc_write_buffer": _DFLT_IPC_WBUFFER,
"ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT,
"ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES,
"ipv6": None,
"file_buffer_size": 262144,
"tcp_pub_port": 4510,
Expand Down Expand Up @@ -1506,6 +1520,8 @@ def _gather_buffer_space():
"enforce_mine_cache": False,
"ipc_mode": _DFLT_IPC_MODE,
"ipc_write_buffer": _DFLT_IPC_WBUFFER,
"ipc_write_timeout": salt.defaults.IPC_WRITE_TIMEOUT,
"ipc_publisher_pending_writes": salt.defaults.IPC_PUBLISHER_PENDING_WRITES,
# various subprocess niceness levels
"req_server_niceness": None,
"pub_server_niceness": None,
Expand Down
13 changes: 13 additions & 0 deletions salt/defaults/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ def __repr__(self):
cases are proper defaults and are also proper values to pass.
"""
NOT_SET = _Constant("NOT_SET")

# Default timeout (seconds) applied to IPCMessagePublisher writes; if a
# subscriber does not consume within this window the publisher drops it to
# prevent unbounded memory growth from pending writes. ``0`` disables the
# timeout (legacy behavior).
IPC_WRITE_TIMEOUT = 30

# Maximum number of in-flight stream.write() coroutines per IPC subscriber
# before IPCMessagePublisher.publish() starts skipping that subscriber. This
# bounds memory growth from pending writes on backlogged consumers while
# leaving enough headroom for legitimate bursts. ``0`` disables the bound
# (legacy behavior).
IPC_PUBLISHER_PENDING_WRITES = 10000
50 changes: 49 additions & 1 deletion salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
IPC transport classes
"""

import datetime
import errno
import logging
import socket
import time
import warnings

import salt.defaults
import salt.ext.tornado
import salt.ext.tornado.concurrent
import salt.ext.tornado.gen
Expand Down Expand Up @@ -508,6 +510,10 @@ def __init__(self, opts, socket_path, io_loop=None):
self.io_loop = io_loop or IOLoop.current()
self._closing = False
self.streams = set()
# Per-stream in-flight _write coroutine counter, used by publish()
# to bound queued writes on backlogged subscribers without dropping
# legitimate burst traffic on a momentarily-slow consumer.
self._pending_writes = {}

def start(self):
"""
Expand Down Expand Up @@ -536,8 +542,26 @@ def start(self):

@salt.ext.tornado.gen.coroutine
def _write(self, stream, pack):
timeout = self.opts.get("ipc_write_timeout", salt.defaults.IPC_WRITE_TIMEOUT)
try:
yield stream.write(pack)
if timeout and timeout > 0:
yield salt.ext.tornado.gen.with_timeout(
datetime.timedelta(seconds=timeout),
stream.write(pack),
quiet_exceptions=(StreamClosedError,),
)
else:
yield stream.write(pack)
except salt.ext.tornado.gen.TimeoutError:
log.warning(
"Non-consuming IPC subscriber on %s exceeded ipc_write_timeout"
" (%ss); dropping it to avoid unbounded memory growth",
self.socket_path,
timeout,
)
if not stream.closed():
stream.close()
self.streams.discard(stream)
except StreamClosedError:
log.trace("Client disconnected from IPC %s", self.socket_path)
self.streams.discard(stream)
Expand All @@ -546,6 +570,12 @@ def _write(self, stream, pack):
if not stream.closed():
stream.close()
self.streams.discard(stream)
finally:
pending = self._pending_writes.get(stream, 0) - 1
if pending <= 0:
self._pending_writes.pop(stream, None)
else:
self._pending_writes[stream] = pending

def publish(self, msg):
"""
Expand All @@ -555,7 +585,23 @@ def publish(self, msg):
return

pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
hwm = self.opts.get(
"ipc_publisher_pending_writes",
salt.defaults.IPC_PUBLISHER_PENDING_WRITES,
)
for stream in self.streams:
# Bound the number of in-flight _write coroutines per subscriber.
# Each spawn_callback(self._write, ...) holds a reference to
# ``pack`` until the write drains; an unbounded queue on a
# non-consuming subscriber is what causes the publisher RSS to
# balloon. A subscriber that exceeds ``hwm`` pending writes is
# treated as backlogged -- we skip new writes for it, and the
# timeout branch in _write() will eventually close the stream
# and discard it once ipc_write_timeout elapses on the oldest
# pending write. A hwm of 0 disables the bound (legacy behavior).
if hwm and self._pending_writes.get(stream, 0) >= hwm:
continue
self._pending_writes[stream] = self._pending_writes.get(stream, 0) + 1
self.io_loop.spawn_callback(self._write, stream, pack)

def handle_connection(self, connection, address):
Expand All @@ -574,6 +620,7 @@ def handle_connection(self, connection, address):

def discard_after_closed():
self.streams.discard(stream)
self._pending_writes.pop(stream, None)

stream.set_close_callback(discard_after_closed)
except Exception as exc: # pylint: disable=broad-except
Expand All @@ -591,6 +638,7 @@ def close(self):
for stream in self.streams:
stream.close()
self.streams.clear()
self._pending_writes.clear()
if hasattr(self.sock, "close"):
self.sock.close()

Expand Down
Loading
Loading