diff --git a/changes/ab03fe160c11224d2e8b98043be9bf3d.yaml b/changes/ab03fe160c11224d2e8b98043be9bf3d.yaml new file mode 100644 index 00000000000..f00a50ad352 --- /dev/null +++ b/changes/ab03fe160c11224d2e8b98043be9bf3d.yaml @@ -0,0 +1,11 @@ +--- +desc: Add a new Cell configuration option, ``limit:fd:free``. This represents the + minimum percentage of available file descriptors that a Synapse service that + is required in order to start up without entering a read-only state. This value + is also monitored every minute and will disable the Cell Nexus if the free + file descriptors drops below the specified value. This value defaults to five + percent ( ``5 %`` ) of available file descriptors. +desc:literal: false +prs: [] +type: feat +... diff --git a/synapse/cortex.py b/synapse/cortex.py index f5efa6d550c..5c6158ecce0 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -5144,6 +5144,7 @@ async def addView(self, vdef, nexs=True): s_schemas.reqValidView(vdef) if nexs: + self.checkOpenFd() return await self._push('view:add', vdef) else: return await self._addView(vdef) @@ -5484,6 +5485,7 @@ async def addLayer(self, ldef=None, nexs=True): s_layer.reqValidLdef(ldef) if nexs: + self.checkOpenFd() return await self._push('layer:add', ldef) else: return await self._addLayer(ldef, (None, None)) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 7fbee4cb772..c3fe3b4e368 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -13,6 +13,7 @@ import argparse import datetime import platform +import resource import tempfile import functools import contextlib @@ -85,7 +86,8 @@ feat_aha_callpeers_v1 = ('callpeers', 1) -diskspace = "Insufficient free space on disk." +diskspace_mesg = "Insufficient free space on disk." +openfd_mesg = "Insufficient open file descriptors available." def adminapi(log=False): ''' @@ -1026,6 +1028,13 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): 'minimum': 0, 'maximum': 100, }, + 'limit:fd:free': { + 'default': 5, + 'description': 'Minimum amount, as a percentage, of unused file descriptors before setting the cell read-only.', + 'type': ['integer', 'null'], + 'minimum': 0, + 'maximum': 100, + }, 'health:sysctl:checks': { 'default': True, 'description': 'Enable sysctl parameter checks and warn if values are not optimal.', @@ -1156,6 +1165,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): BACKUP_SPAWN_TIMEOUT = 60.0 FREE_SPACE_CHECK_FREQ = 60.0 + OPEN_FD_CHECK_FREQ = 60.0 COMMIT = s_version.commit VERSION = s_version.version @@ -1194,6 +1204,7 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): self.https_listeners = [] self.ahaclient = None self._checkspace = s_coro.Event() + self._checkopenfd = s_coro.Event() self._reloadfuncs = {} # name -> func self.nexslock = asyncio.Lock() @@ -1212,16 +1223,20 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): mesg = f'Booting {self.getCellType()} in safe-mode. Some functionality may be disabled.' logger.warning(mesg) - self.minfree = self.conf.get('limit:disk:free') - if self.minfree is not None: - self.minfree = self.minfree / 100 + self.min_disk_free = self.conf.get('limit:disk:free') + if self.min_disk_free is not None: + self.min_disk_free = self.min_disk_free / 100 disk = shutil.disk_usage(self.dirn) - if (disk.free / disk.total) <= self.minfree: + if (disk.free / disk.total) <= self.min_disk_free: free = disk.free / disk.total * 100 mesg = f'Free space on {self.dirn} below minimum threshold (currently {free:.2f}%)' raise s_exc.LowSpace(mesg=mesg, dirn=self.dirn) + self.min_fd_free = self.conf.get('limit:fd:free') + if self.min_fd_free is not None: + self.min_fd_free = self.min_fd_free / 100 + self._delTmpFiles() if self.conf.get('onboot:optimize'): @@ -1397,6 +1412,10 @@ async def fini(): # phase 5 - service networking await self.initServiceNetwork() + # End of __anit__ - wake up the fd loop in the event that the cell has + # changed such that the service would now go into a write only mode. + self.checkOpenFd() + async def _storCellHiveMigration(self): logger.warning(f'migrating Cell ({self.getCellType()}) info out of hive') @@ -1775,6 +1794,10 @@ async def _bumpCellVers(self, name, updates, nexs=True): def checkFreeSpace(self): self._checkspace.set() + def checkOpenFd(self): + # TODO Insert this call before places where a user may open persistent files ( mainly lmdb slabs ! ) + self._checkopenfd.set() + async def _runFreeSpaceLoop(self): while not self.isfini: @@ -1785,15 +1808,15 @@ async def _runFreeSpaceLoop(self): disk = shutil.disk_usage(self.dirn) - if (disk.free / disk.total) <= self.minfree: + if (disk.free / disk.total) <= self.min_disk_free: - await nexsroot.addWriteHold(diskspace) + await nexsroot.addWriteHold(diskspace_mesg) mesg = f'Free space on {self.dirn} below minimum threshold (currently ' \ f'{disk.free / disk.total * 100:.2f}%), setting Cell to read-only.' logger.error(mesg) - elif nexsroot.readonly and await nexsroot.delWriteHold(diskspace): + elif nexsroot.readonly and await nexsroot.delWriteHold(diskspace_mesg): mesg = f'Free space on {self.dirn} above minimum threshold (currently ' \ f'{disk.free / disk.total * 100:.2f}%), removing free space write hold.' @@ -1825,6 +1848,41 @@ async def _runSysctlLoop(self): await self.waitfini(self.SYSCTL_CHECK_FREQ) + async def _runOpenFdLoop(self): + + while not self.isfini: + + nexsroot = self.getCellNexsRoot() + + self._checkopenfd.clear() + + fdusage = s_thisplat.getOpenFdInfo() + + limit = fdusage['soft_limit'] + usage = fdusage['usage'] + + free = (limit - usage) / limit + + # If the soft_limit is not unlimited ( signaled via resource.RLIM_INFINITY ) and the + # free percentage is < self.min_fd_free, we lock the cell; otherwise we remove our lock + # on the cell. + + if limit != resource.RLIM_INFINITY and free <= self.min_fd_free: + + await nexsroot.addWriteHold(openfd_mesg) + + mesg = f'Available file descriptors has dropped below minimum threshold' \ + f'(currently {free * 100:.2f}%), setting Cell to read-only.' + logger.error(mesg, extra={'synapse': fdusage}) + + elif nexsroot.readonly and await nexsroot.delWriteHold(openfd_mesg): + + mesg = f'Available file descriptors above minimum threshold' \ + f'(currently {free * 100:.2f}%), removing file descriptor write hold.' + logger.error(mesg, extra={'synapse': fdusage}) + + await self._checkopenfd.timewait(timeout=self.OPEN_FD_CHECK_FREQ) + async def _initAhaRegistry(self): ahaurls = self.conf.get('aha:registry') @@ -2087,9 +2145,12 @@ async def initNexusSubsystem(self): await self.nexsroot.startup() await self.setCellActive(self.conf.get('mirror') is None) - if self.minfree is not None: + if self.min_disk_free is not None: self.schedCoro(self._runFreeSpaceLoop()) + if self.min_fd_free is not None: + self.schedCoro(self._runOpenFdLoop()) + async def _bindDmonListen(self): # functionalized so downstream code can bind early. @@ -2656,7 +2717,7 @@ def _reqBackupSpace(self): cellsize, _ = s_common.getDirSize(self.dirn) if os.stat(self.dirn).st_dev == os.stat(self.backdirn).st_dev: - reqspace = self.minfree * disk.total + cellsize + reqspace = self.min_fd_free * disk.total + cellsize else: reqspace = cellsize @@ -4897,6 +4958,7 @@ async def getSystemInfo(self): availmem = s_thisplat.getAvailableMemory() pyversion = platform.python_version() cpucount = multiprocessing.cpu_count() + fdusage = s_thisplat.getOpenFdInfo() sysctls = s_thisplat.getSysctls() tmpdir = s_thisplat.getTempDir() @@ -4916,6 +4978,7 @@ async def getSystemInfo(self): 'cpucount': cpucount, # Number of CPUs on system 'sysctls': sysctls, # Performance related sysctls 'tmpdir': tmpdir, # Temporary File / Folder Directory + 'fdusage': fdusage, # Soft limits, hard limits, and open fd descriptors for the current process. } return retn diff --git a/synapse/lib/platforms/darwin.py b/synapse/lib/platforms/darwin.py index 321bf33791f..3b304e8989b 100644 --- a/synapse/lib/platforms/darwin.py +++ b/synapse/lib/platforms/darwin.py @@ -1,4 +1,7 @@ +import os +import errno import logging +import resource logger = logging.getLogger(__name__) @@ -6,4 +9,19 @@ def initHostInfo(): return { 'format': 'macho', 'platform': 'darwin', + 'hasopenfds': True, } + +def getOpenFdInfo(): + soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + usage = len(os.listdir(f'/dev/fd')) + except OSError as err: + if err.errno == errno.EMFILE: + # We've hit the maximum allowed files and cannot list contents of /proc/; + # so we set usage to soft_limit so the caller can know that we're exactly at the limit. + usage = soft_limit + else: + raise + ret = {'soft_limit': soft_limit, 'hard_limit': hard_limit, 'usage': usage} + return ret diff --git a/synapse/lib/platforms/linux.py b/synapse/lib/platforms/linux.py index 55f2b1caa90..1edd3912b40 100644 --- a/synapse/lib/platforms/linux.py +++ b/synapse/lib/platforms/linux.py @@ -1,4 +1,5 @@ import os +import errno import logging import resource import contextlib @@ -19,6 +20,7 @@ def initHostInfo(): 'platform': 'linux', 'hasmemlocking': True, # has mlock, and all the below related functions 'hassysctls': True, + 'hasopenfds': True, } def getFileMappedRegion(filename): @@ -105,6 +107,19 @@ def getTotalMemory(): logger.warning('Unable to find max memory limit') # pragma: no cover return 0 # pragma: no cover +def getOpenFdInfo(): + soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + usage = len(os.listdir(f'/proc/self/fd')) + except OSError as err: + if err.errno == errno.EMFILE: + # We've hit the maximum allowed files and cannot list contents of /proc/; + # so we set usage to soft_limit so the caller can know that we're exactly at the limit. + usage = soft_limit + else: + raise + ret = {'soft_limit': soft_limit, 'hard_limit': hard_limit, 'usage': usage} + return ret def getSysctls(): _sysctls = ( diff --git a/synapse/lib/spooled.py b/synapse/lib/spooled.py index c89cfbc4acf..7e7cb9d210e 100644 --- a/synapse/lib/spooled.py +++ b/synapse/lib/spooled.py @@ -49,6 +49,12 @@ async def _initFallBack(self): slabpath = tempfile.mkdtemp(dir=dirn, prefix='spooled_', suffix='.lmdb') + if self.cell is not None: + # Wake the host cell and give it the opportunity to check openfd counts. + # This will not stop the spooled set creation but may prevent downstream + # activity from causing an issue. + self.cell.checkOpenFd() + self.slab = await s_lmdbslab.Slab.anit(slabpath, map_size=DEFAULT_MAPSIZE) if self.cell is not None: self.slab.addResizeCallback(self.cell.checkFreeSpace) diff --git a/synapse/lib/view.py b/synapse/lib/view.py index 8e8cb8a8f03..dd101b75e42 100644 --- a/synapse/lib/view.py +++ b/synapse/lib/view.py @@ -1402,6 +1402,7 @@ async def insertParentFork(self, useriden, name=None): s_layer.reqValidLdef(ldef) s_schemas.reqValidView(vdef) + self.core.checkOpenFd() return await self._push('view:forkparent', ldef, vdef) @s_nexus.Pusher.onPush('view:forkparent', passitem=True) diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 353d6793695..7d9698af950 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -2593,6 +2593,130 @@ async def test_passwd_regression(self): self.false(await root.tryPasswd('root')) self.true(await root.tryPasswd('supersecretpassword')) + async def test_cell_minfiles(self): + self.thisHostMust(hasopenfds=True) + + with self.raises(s_exc.IsReadOnly) as cm: + conf = {'limit:fd:free': 100} + async with self.getTestCell(conf=conf) as cell: + await asyncio.sleep(0.1) + cell.checkOpenFd() + await cell.sync() + self.isin('Insufficient open file descriptors available.', cm.exception.get('mesg')) + + revt = asyncio.Event() + addWriteHold = s_nexus.NexsRoot.addWriteHold + delWriteHold = s_nexus.NexsRoot.delWriteHold + + async def wrapAddWriteHold(root, reason): + retn = await addWriteHold(root, reason) + revt.set() + return retn + + async def wrapDelWriteHold(root, reason): + retn = await delWriteHold(root, reason) + revt.set() + return retn + + _ntuple_diskusage = collections.namedtuple('usage', 'total used free') + + def full_fds(): + return {'hard_limit': 256, 'soft_limit': 256, 'usage': 255} + + def unlimited_fds(): + return {'hard_limit': -1, 'soft_limit': -1, 'usage': 255} + + revt = asyncio.Event() + addWriteHold = s_nexus.NexsRoot.addWriteHold + delWriteHold = s_nexus.NexsRoot.delWriteHold + + async def wrapAddWriteHold(root, reason): + retn = await addWriteHold(root, reason) + revt.set() + return retn + + async def wrapDelWriteHold(root, reason): + retn = await delWriteHold(root, reason) + revt.set() + return retn + + with mock.patch.object(s_cell.Cell, 'OPEN_FD_CHECK_FREQ', 0.1), \ + mock.patch.object(s_nexus.NexsRoot, 'addWriteHold', wrapAddWriteHold), \ + mock.patch.object(s_nexus.NexsRoot, 'delWriteHold', wrapDelWriteHold): + async with self.getTestCore() as core: + + fork_q = 'view.fork --name somefork $lib.view.get().iden' + + await core.nodes(fork_q) + + with mock.patch('synapse.lib.thisplat.getOpenFdInfo', full_fds): + self.true(await asyncio.wait_for(revt.wait(), 6)) + + msgs = await core.stormlist(fork_q) + self.stormIsInErr(s_cell.openfd_mesg, msgs) + + revt.clear() + self.true(await asyncio.wait_for(revt.wait(), 6)) + + await core.nodes(fork_q) + + # Check with an unlimited ulimit. + # First we can set the write hold manually + await cell.nexsroot.addWriteHold(s_cell.openfd_mesg) + self.true(cell.nexsroot.readonly) + + with mock.patch('synapse.lib.thisplat.getOpenFdInfo', unlimited_fds): + + # Then see it be cleared + self.true(await asyncio.wait_for(revt.wait(), 1)) + + msgs = await core.stormlist(fork_q) + self.stormHasNoWarnErr(msgs) + + # Mirrors can be blocked and then recover + with self.getTestDir() as dirn: + + path00 = s_common.gendir(dirn, 'core00') + path01 = s_common.gendir(dirn, 'core01') + + conf = {'limit:fd:free': 0} + async with self.getTestCore(dirn=path00, conf=conf) as core00: + await core00.nodes('[ inet:ipv4=1.2.3.4 ]') + + s_tools_backup.backup(path00, path01) + + async with self.getTestCore(dirn=path00, conf=conf) as core00: + + core01conf = {'mirror': core00.getLocalUrl()} + + async with self.getTestCore(dirn=path01, conf=core01conf) as core01: + + await core01.sync() + + revt.clear() + with mock.patch('synapse.lib.thisplat.getOpenFdInfo', full_fds): + self.true(await asyncio.wait_for(revt.wait(), 1)) + + msgs = await core01.stormlist('[inet:fqdn=newp.fail]') + self.stormIsInErr(s_cell.openfd_mesg, msgs) + msgs = await core01.stormlist('[inet:fqdn=newp.fail]') + self.stormIsInErr(s_cell.openfd_mesg, msgs) + self.len(1, await core00.nodes('[ inet:ipv4=2.3.4.5 ]')) + + offs = await core00.getNexsIndx() + self.false(await core01.waitNexsOffs(offs, 1)) + + self.len(1, await core01.nodes('inet:ipv4=1.2.3.4')) + self.len(0, await core01.nodes('inet:ipv4=2.3.4.5')) + revt.clear() + + revt.clear() + self.true(await asyncio.wait_for(revt.wait(), 1)) + await core01.sync() + + self.len(1, await core01.nodes('inet:ipv4=1.2.3.4')) + self.len(1, await core01.nodes('inet:ipv4=2.3.4.5')) + async def test_cell_minspace(self): with self.raises(s_exc.LowSpace): @@ -2618,8 +2742,6 @@ async def wrapDelWriteHold(root, reason): revt.set() return retn - errmsg = 'Insufficient free space on disk.' - with mock.patch.object(s_cell.Cell, 'FREE_SPACE_CHECK_FREQ', 0.1), \ mock.patch.object(s_nexus.NexsRoot, 'addWriteHold', wrapAddWriteHold), \ mock.patch.object(s_nexus.NexsRoot, 'delWriteHold', wrapDelWriteHold): @@ -2644,7 +2766,7 @@ async def wrapDelWriteHold(root, reason): self.true(await asyncio.wait_for(revt.wait(), 1)) msgs = await core.stormlist('[inet:fqdn=newp.fail]') - self.stormIsInErr(errmsg, msgs) + self.stormIsInErr(s_cell.diskspace_mesg, msgs) revt.clear() self.true(await asyncio.wait_for(revt.wait(), 1)) @@ -2675,9 +2797,9 @@ async def wrapDelWriteHold(root, reason): self.true(await asyncio.wait_for(revt.wait(), 1)) msgs = await core01.stormlist('[inet:fqdn=newp.fail]') - self.stormIsInErr(errmsg, msgs) + self.stormIsInErr(s_cell.diskspace_mesg, msgs) msgs = await core01.stormlist('[inet:fqdn=newp.fail]') - self.stormIsInErr(errmsg, msgs) + self.stormIsInErr(s_cell.diskspace_mesg, msgs) self.len(1, await core00.nodes('[ inet:ipv4=2.3.4.5 ]')) offs = await core00.getNexsIndx() @@ -2707,7 +2829,7 @@ async def wrapDelWriteHold(root, reason): with mock.patch('shutil.disk_usage', full_disk): opts = {'view': viewiden} msgs = await core.stormlist('for $x in $lib.range(20000) {[inet:ipv4=$x]}', opts=opts) - self.stormIsInErr(errmsg, msgs) + self.stormIsInErr(s_cell.diskspace_mesg, msgs) nodes = await core.nodes('inet:ipv4', opts=opts) self.gt(len(nodes), 0) self.lt(len(nodes), 20000) diff --git a/synapse/tests/test_lib_platforms_linux.py b/synapse/tests/test_lib_platforms_linux.py index bd4547b5bb9..fec5bc98607 100644 --- a/synapse/tests/test_lib_platforms_linux.py +++ b/synapse/tests/test_lib_platforms_linux.py @@ -1,5 +1,8 @@ +import errno import pathlib +import unittest.mock as mock + import synapse.exc as s_exc import synapse.tests.utils as s_t_utils @@ -64,3 +67,34 @@ def test_sysctls(self): self.isinstance(ret['vm.dirty_bytes'], int) self.isin('vm.dirty_background_bytes', ret) self.isinstance(ret['vm.dirty_background_bytes'], int) + + def test_openfds(self): + self.thisHostMust(hasopenfds=True) + ret = s_thisplat.getOpenFdInfo() + self.isinstance(ret, dict) + self.isin('hard_limit', ret) + self.isin('soft_limit', ret) + self.isin('usage', ret) + self.true(ret.get('usage') > 0) + + def bad_listdir_emfile(path): + e = OSError('ruh roh') + e.errno = errno.EMFILE + raise e + + def bad_listdir_enoent(path): + e = OSError('ruh roh') + e.errno = errno.ENOENT + raise e + + with mock.patch('os.listdir', bad_listdir_emfile): + ret = s_thisplat.getOpenFdInfo() + self.isinstance(ret, dict) + self.isin('hard_limit', ret) + self.isin('soft_limit', ret) + self.true(ret.get('usage') > 0) + self.eq(ret.get('usage'), ret.get('soft_limit'), ret) + + with mock.patch('os.listdir', bad_listdir_enoent): + with self.raises(OSError) as cm: + s_thisplat.getOpenFdInfo()