Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions changes/ab03fe160c11224d2e8b98043be9bf3d.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
desc: Add a new Cell configuration option, ``limit:fd:free``. This represents the
minimum percentage of available file descriptors that a Synapse service that
is required in order to start up without entering a read-only state. This value
is also monitored every minute and will disable the Cell Nexus if the free
file descriptors drops below the specified value. This value defaults to five
percent ( ``5 %`` ) of available file descriptors.
desc:literal: false
prs: []
type: feat
...
2 changes: 2 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -5144,6 +5144,7 @@ async def addView(self, vdef, nexs=True):
s_schemas.reqValidView(vdef)

if nexs:
self.checkOpenFd()
return await self._push('view:add', vdef)
else:
return await self._addView(vdef)
Expand Down Expand Up @@ -5484,6 +5485,7 @@ async def addLayer(self, ldef=None, nexs=True):
s_layer.reqValidLdef(ldef)

if nexs:
self.checkOpenFd()
return await self._push('layer:add', ldef)
else:
return await self._addLayer(ldef, (None, None))
Expand Down
83 changes: 73 additions & 10 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import argparse
import datetime
import platform
import resource
import tempfile
import functools
import contextlib
Expand Down Expand Up @@ -85,7 +86,8 @@

feat_aha_callpeers_v1 = ('callpeers', 1)

diskspace = "Insufficient free space on disk."
diskspace_mesg = "Insufficient free space on disk."
openfd_mesg = "Insufficient open file descriptors available."

def adminapi(log=False):
'''
Expand Down Expand Up @@ -1026,6 +1028,13 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
'minimum': 0,
'maximum': 100,
},
'limit:fd:free': {
'default': 5,
'description': 'Minimum amount, as a percentage, of unused file descriptors before setting the cell read-only.',
'type': ['integer', 'null'],
'minimum': 0,
'maximum': 100,
},
'health:sysctl:checks': {
'default': True,
'description': 'Enable sysctl parameter checks and warn if values are not optimal.',
Expand Down Expand Up @@ -1156,6 +1165,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):

BACKUP_SPAWN_TIMEOUT = 60.0
FREE_SPACE_CHECK_FREQ = 60.0
OPEN_FD_CHECK_FREQ = 60.0

COMMIT = s_version.commit
VERSION = s_version.version
Expand Down Expand Up @@ -1194,6 +1204,7 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
self.https_listeners = []
self.ahaclient = None
self._checkspace = s_coro.Event()
self._checkopenfd = s_coro.Event()
self._reloadfuncs = {} # name -> func

self.nexslock = asyncio.Lock()
Expand All @@ -1212,16 +1223,20 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
mesg = f'Booting {self.getCellType()} in safe-mode. Some functionality may be disabled.'
logger.warning(mesg)

self.minfree = self.conf.get('limit:disk:free')
if self.minfree is not None:
self.minfree = self.minfree / 100
self.min_disk_free = self.conf.get('limit:disk:free')
if self.min_disk_free is not None:
self.min_disk_free = self.min_disk_free / 100

disk = shutil.disk_usage(self.dirn)
if (disk.free / disk.total) <= self.minfree:
if (disk.free / disk.total) <= self.min_disk_free:
free = disk.free / disk.total * 100
mesg = f'Free space on {self.dirn} below minimum threshold (currently {free:.2f}%)'
raise s_exc.LowSpace(mesg=mesg, dirn=self.dirn)

self.min_fd_free = self.conf.get('limit:fd:free')
if self.min_fd_free is not None:
self.min_fd_free = self.min_fd_free / 100

self._delTmpFiles()

if self.conf.get('onboot:optimize'):
Expand Down Expand Up @@ -1397,6 +1412,10 @@ async def fini():
# phase 5 - service networking
await self.initServiceNetwork()

# End of __anit__ - wake up the fd loop in the event that the cell has
# changed such that the service would now go into a write only mode.
self.checkOpenFd()

async def _storCellHiveMigration(self):
logger.warning(f'migrating Cell ({self.getCellType()}) info out of hive')

Expand Down Expand Up @@ -1775,6 +1794,10 @@ async def _bumpCellVers(self, name, updates, nexs=True):
def checkFreeSpace(self):
self._checkspace.set()

def checkOpenFd(self):
# TODO Insert this call before places where a user may open persistent files ( mainly lmdb slabs ! )
self._checkopenfd.set()

async def _runFreeSpaceLoop(self):

while not self.isfini:
Expand All @@ -1785,15 +1808,15 @@ async def _runFreeSpaceLoop(self):

disk = shutil.disk_usage(self.dirn)

if (disk.free / disk.total) <= self.minfree:
if (disk.free / disk.total) <= self.min_disk_free:

await nexsroot.addWriteHold(diskspace)
await nexsroot.addWriteHold(diskspace_mesg)

mesg = f'Free space on {self.dirn} below minimum threshold (currently ' \
f'{disk.free / disk.total * 100:.2f}%), setting Cell to read-only.'
logger.error(mesg)

elif nexsroot.readonly and await nexsroot.delWriteHold(diskspace):
elif nexsroot.readonly and await nexsroot.delWriteHold(diskspace_mesg):

mesg = f'Free space on {self.dirn} above minimum threshold (currently ' \
f'{disk.free / disk.total * 100:.2f}%), removing free space write hold.'
Expand Down Expand Up @@ -1825,6 +1848,41 @@ async def _runSysctlLoop(self):

await self.waitfini(self.SYSCTL_CHECK_FREQ)

async def _runOpenFdLoop(self):

while not self.isfini:

nexsroot = self.getCellNexsRoot()

self._checkopenfd.clear()

fdusage = s_thisplat.getOpenFdInfo()

limit = fdusage['soft_limit']
usage = fdusage['usage']

free = (limit - usage) / limit

# If the soft_limit is not unlimited ( signaled via resource.RLIM_INFINITY ) and the
# free percentage is < self.min_fd_free, we lock the cell; otherwise we remove our lock
# on the cell.

if limit != resource.RLIM_INFINITY and free <= self.min_fd_free:

await nexsroot.addWriteHold(openfd_mesg)

mesg = f'Available file descriptors has dropped below minimum threshold' \
f'(currently {free * 100:.2f}%), setting Cell to read-only.'
logger.error(mesg, extra={'synapse': fdusage})

elif nexsroot.readonly and await nexsroot.delWriteHold(openfd_mesg):

mesg = f'Available file descriptors above minimum threshold' \
f'(currently {free * 100:.2f}%), removing file descriptor write hold.'
logger.error(mesg, extra={'synapse': fdusage})

await self._checkopenfd.timewait(timeout=self.OPEN_FD_CHECK_FREQ)

async def _initAhaRegistry(self):

ahaurls = self.conf.get('aha:registry')
Expand Down Expand Up @@ -2087,9 +2145,12 @@ async def initNexusSubsystem(self):
await self.nexsroot.startup()
await self.setCellActive(self.conf.get('mirror') is None)

if self.minfree is not None:
if self.min_disk_free is not None:
self.schedCoro(self._runFreeSpaceLoop())

if self.min_fd_free is not None:
self.schedCoro(self._runOpenFdLoop())

async def _bindDmonListen(self):

# functionalized so downstream code can bind early.
Expand Down Expand Up @@ -2656,7 +2717,7 @@ def _reqBackupSpace(self):
cellsize, _ = s_common.getDirSize(self.dirn)

if os.stat(self.dirn).st_dev == os.stat(self.backdirn).st_dev:
reqspace = self.minfree * disk.total + cellsize
reqspace = self.min_fd_free * disk.total + cellsize
else:
reqspace = cellsize

Expand Down Expand Up @@ -4897,6 +4958,7 @@ async def getSystemInfo(self):
availmem = s_thisplat.getAvailableMemory()
pyversion = platform.python_version()
cpucount = multiprocessing.cpu_count()
fdusage = s_thisplat.getOpenFdInfo()
sysctls = s_thisplat.getSysctls()
tmpdir = s_thisplat.getTempDir()

Expand All @@ -4916,6 +4978,7 @@ async def getSystemInfo(self):
'cpucount': cpucount, # Number of CPUs on system
'sysctls': sysctls, # Performance related sysctls
'tmpdir': tmpdir, # Temporary File / Folder Directory
'fdusage': fdusage, # Soft limits, hard limits, and open fd descriptors for the current process.
}

return retn
Expand Down
18 changes: 18 additions & 0 deletions synapse/lib/platforms/darwin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import os
import errno
import logging
import resource

logger = logging.getLogger(__name__)

def initHostInfo():
return {
'format': 'macho',
'platform': 'darwin',
'hasopenfds': True,
}

def getOpenFdInfo():
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
try:
usage = len(os.listdir(f'/dev/fd'))
except OSError as err:
if err.errno == errno.EMFILE:
# We've hit the maximum allowed files and cannot list contents of /proc/;
# so we set usage to soft_limit so the caller can know that we're exactly at the limit.
usage = soft_limit
else:
raise
ret = {'soft_limit': soft_limit, 'hard_limit': hard_limit, 'usage': usage}
return ret
15 changes: 15 additions & 0 deletions synapse/lib/platforms/linux.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import errno
import logging
import resource
import contextlib
Expand All @@ -19,6 +20,7 @@ def initHostInfo():
'platform': 'linux',
'hasmemlocking': True, # has mlock, and all the below related functions
'hassysctls': True,
'hasopenfds': True,
}

def getFileMappedRegion(filename):
Expand Down Expand Up @@ -105,6 +107,19 @@ def getTotalMemory():
logger.warning('Unable to find max memory limit') # pragma: no cover
return 0 # pragma: no cover

def getOpenFdInfo():
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
try:
usage = len(os.listdir(f'/proc/self/fd'))
except OSError as err:
if err.errno == errno.EMFILE:
# We've hit the maximum allowed files and cannot list contents of /proc/;
# so we set usage to soft_limit so the caller can know that we're exactly at the limit.
usage = soft_limit
else:
raise
ret = {'soft_limit': soft_limit, 'hard_limit': hard_limit, 'usage': usage}
return ret

def getSysctls():
_sysctls = (
Expand Down
6 changes: 6 additions & 0 deletions synapse/lib/spooled.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ async def _initFallBack(self):

slabpath = tempfile.mkdtemp(dir=dirn, prefix='spooled_', suffix='.lmdb')

if self.cell is not None:
# Wake the host cell and give it the opportunity to check openfd counts.
# This will not stop the spooled set creation but may prevent downstream
# activity from causing an issue.
self.cell.checkOpenFd()

self.slab = await s_lmdbslab.Slab.anit(slabpath, map_size=DEFAULT_MAPSIZE)
if self.cell is not None:
self.slab.addResizeCallback(self.cell.checkFreeSpace)
Expand Down
1 change: 1 addition & 0 deletions synapse/lib/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ async def insertParentFork(self, useriden, name=None):
s_layer.reqValidLdef(ldef)
s_schemas.reqValidView(vdef)

self.core.checkOpenFd()
return await self._push('view:forkparent', ldef, vdef)

@s_nexus.Pusher.onPush('view:forkparent', passitem=True)
Expand Down
Loading