diff --git a/synapse/common.py b/synapse/common.py index c36ded1ce35..b8c35995eda 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -781,15 +781,24 @@ def makedirs(path, mode=0o777): def iterzip(*args, fillvalue=None): return itertools.zip_longest(*args, fillvalue=fillvalue) -def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None): +def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None, qsize=None): if structlog: structlog = 'true' else: structlog = 'false' + + if qsize is None: + qsize = s_structlog.DEFAULT_QSIZE + defval = os.getenv('SYN_LOG_LEVEL', defval) datefmt = os.getenv('SYN_LOG_DATEFORMAT', datefmt) structlog = envbool('SYN_LOG_STRUCT', structlog) - ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt} + + qsize = int(os.getenv('SYN_LOG_QSIZE', qsize)) + if qsize < 0: + raise s_exc.BadArg(mesg=f'SYN_LOG_QSIZE must be >=0, got {qsize}') + + ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt, 'qsize': qsize} return ret def normLogLevel(valu): @@ -820,7 +829,7 @@ def normLogLevel(valu): return normLogLevel(valu) raise s_exc.BadArg(mesg=f'Unknown log level type: {type(valu)} {valu}', valu=valu) -def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None): +def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None, qsize=None): ''' Configure synapse logging. @@ -829,15 +838,16 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non defval (str): Default log level. May be an integer. structlog (bool): Enabled structured (jsonl) logging output. datefmt (str): Optional strftime format string. - + qsize (int): Optional queue size for the root logger. Notes: This calls logging.basicConfig and should only be called once per process. Returns: None ''' - ret = _getLogConfFromEnv(defval, structlog, datefmt) + ret = _getLogConfFromEnv(defval, structlog, datefmt, qsize) + qsize = ret.get('qsize') datefmt = ret.get('datefmt') log_level = ret.get('defval') log_struct = ret.get('structlog') @@ -846,13 +856,15 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non log_level = normLogLevel(log_level) + handler = s_structlog.StreamHandlerWithQueue(qsize=qsize) + if log_struct: - handler = logging.StreamHandler() formatter = s_structlog.JsonFormatter(datefmt=datefmt) handler.setFormatter(formatter) logging.basicConfig(level=log_level, handlers=(handler,)) else: - logging.basicConfig(level=log_level, format=s_const.LOG_FORMAT, datefmt=datefmt) + logging.basicConfig(level=log_level, handlers=(handler,), format=s_const.LOG_FORMAT, datefmt=datefmt) + if log_setup: mlogger.info('log level set to %s', s_const.LOG_LEVEL_INVERSE_CHOICES.get(log_level)) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index e904edf5792..75efd39d716 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -16,6 +16,7 @@ import tempfile import functools import contextlib +import collections import multiprocessing import aiohttp @@ -905,6 +906,11 @@ async def getReloadableSystems(self): async def reload(self, subsystem=None): return await self.cell.reload(subsystem=subsystem) + @adminapi(log=True) + async def iterCellLogs(self): + async for mesg in self.cell.iterCellLogs(): + yield mesg + class Cell(s_nexus.Pusher, s_telepath.Aware): ''' A Cell() implements a synapse micro-service. @@ -1179,6 +1185,12 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): self.nexslock = asyncio.Lock() self.netready = asyncio.Event() + self._syn_log_queue = None # type: collections.deque + for handler in logging.getLogger('').handlers: + if hasattr(handler, '_syn_log_queue'): + self._syn_log_queue = handler._syn_log_queue + break + self.conf = self._initCellConf(conf) self.features = { 'tellready': 1, @@ -1531,6 +1543,8 @@ async def fini(self): retn = await s_nexus.Pusher.fini(self) if retn == 0: self._onFiniCellGuid() + if self._syn_log_queue is not None: + self._syn_log_queue = None return retn def _onFiniCellGuid(self): @@ -5084,6 +5098,12 @@ def getCachedSslCtx(self, opts=None, verify=None): key = tuple(sorted(opts.items())) return self._sslctx_cache.get(key) + async def iterCellLogs(self): + if self._syn_log_queue is None: + raise s_exc.SynErr(mesg='no queue available') + for mesg in list(self._syn_log_queue): + yield mesg + async def freeze(self, timeout=30): if self.paused: diff --git a/synapse/lib/stormlib/cell.py b/synapse/lib/stormlib/cell.py index c428f5080aa..8bc96ac7fcf 100644 --- a/synapse/lib/stormlib/cell.py +++ b/synapse/lib/stormlib/cell.py @@ -174,8 +174,27 @@ class CellLib(s_stormtypes.Lib): '(defaults to the Cortex if not provided).'}, ), 'returns': {'type': 'dict', 'desc': 'A dictionary containing uptime data.', }}}, + {'name': 'iterLogs', 'desc': 'Iterate over recent service logs.', + 'type': {'type': 'function', '_funcname': '_methIterLogs', + 'args': ( + {'name': 'name', 'type': 'str', 'default': None, + # FIXME This may need to accomdate other aha / telepath values + 'desc': 'The name, or iden, of the service to get uptime data for ' + '(defaults to the Cortex if not provided).'}, + ), + # FIXME DISCUSS + # The real q is do we want the Telepath/Storm API endpoints to yield strings + # OR do we want to yield struct log messages as dictionaries ? + # Strings are EASY to grab and capture; grabbing the structlog data is + # more complicated. + 'returns': {'name': 'Yields', 'type': 'str'}, + }} ) _storm_lib_path = ('cell',) + # FIXME populate + # _storm_lib_perms = ( + # + # ) def __init__(self, runt, name=()): s_stormtypes.Lib.__init__(self, runt, name=name) @@ -192,6 +211,7 @@ def getObjLocals(self): 'hotFixesCheck': self._hotFixesCheck, 'trimNexsLog': self._trimNexsLog, 'uptime': self._uptime, + 'iterLogs': self._methIterLogs, } @s_stormtypes.stormfunc(readonly=True) @@ -339,3 +359,30 @@ async def _uptime(self, name=None): 'starttime': info['cellstarttime'], 'uptime': info['celluptime'], } + + @s_stormtypes.stormfunc(readonly=True) + async def _methIterLogs(self, name=None): + name = await s_stormtypes.tostr(name, noneok=True) + + # FIXME PERMS DISCUSSION + self.runt.confirm(('storm', 'lib', 'cell', 'iterlogs')) + # if not self.runt.isAdmin(): + # mesg = '$lib.cell.iterLogs() requires admin privs.' + # raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) + + if name is None: + genr = self.runt.snap.core.iterCellLogs() + else: + ssvc = self.runt.snap.core.getStormSvc(name) + if ssvc is None: + # FIXME: FALLBACK to lookup via aha or telepath? + # It sure would be nice to have an easy way to reach the logs from ANYTHING + # but that may be a later scatter/gather support item. + mesg = f'No service with name/iden: {name}' + raise s_exc.NoSuchName(mesg=mesg) + await ssvc.proxy.waitready() + genr = ssvc.proxy.iterCellLogs() + + async for mesg in genr: + yield mesg + await asyncio.sleep(0) diff --git a/synapse/lib/structlog.py b/synapse/lib/structlog.py index daa59d35e7d..1ace1c5506c 100644 --- a/synapse/lib/structlog.py +++ b/synapse/lib/structlog.py @@ -1,9 +1,22 @@ import json import logging +import collections import synapse.common as s_common +DEFAULT_QSIZE = 1024 + +class StreamHandlerWithQueue(logging.StreamHandler): + def __init__(self, stream=None, qsize=DEFAULT_QSIZE): + super().__init__(stream=stream) + self._syn_log_queue = collections.deque(maxlen=qsize) + + def format(self, record: logging.LogRecord) -> str: + mesg = self.format(record) + self._syn_log_queue.append(mesg) + return mesg + class JsonFormatter(logging.Formatter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/synapse/tests/test_lib_stormlib_cell.py b/synapse/tests/test_lib_stormlib_cell.py index a76e56dfa3a..00cd3934483 100644 --- a/synapse/tests/test_lib_stormlib_cell.py +++ b/synapse/tests/test_lib_stormlib_cell.py @@ -1,6 +1,5 @@ import synapse.exc as s_exc -import synapse.lib.aha as s_aha import synapse.lib.coro as s_coro import synapse.lib.const as s_const import synapse.lib.stormlib.cell as s_stormlib_cell @@ -351,3 +350,25 @@ async def test_stormfix_riskhasvuln(self): self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view0', opts={'view': view0})) self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view1', opts={'view': view1})) self.len(1, await core.nodes('risk:vulnerable -> it:host', opts={'view': view2})) + + async def test_stormlib_cell_iterlogs(self): + with self.getAsyncLoggerStream('', '') as stream: + conf = {'storm:log': True} + async with self.getTestCore(conf=conf) as core: + msgs = await core.stormlist('$lib.print(hello)') + msgs = await core.stormlist('$lib.log.info(hello)') + msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') + self.stormIsInPrint('Executing storm query {$lib.print(hello)} ', msgs) + self.stormIsInPrint('Executing storm query {$lib.log.info(hello)}', msgs) + self.stormIsInPrint('hello', msgs) + + with self.getStructuredAsyncLoggerStream('', '') as stream: + conf = {'storm:log': True} + async with self.getTestCore(conf=conf) as core: + msgs = await core.stormlist('$lib.print(hello)') + msgs = await core.stormlist('$lib.log.info(hello)') + msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') + + self.stormIsInPrint('"message": "Executing storm query {$lib.print(hello)}', msgs) + self.stormIsInPrint('"message": "Executing storm query {$lib.log.info(hello)}', msgs) + self.stormIsInPrint('"message": "hello"', msgs) diff --git a/synapse/tests/utils.py b/synapse/tests/utils.py index a6b6145b5c4..aac84ebfdfd 100644 --- a/synapse/tests/utils.py +++ b/synapse/tests/utils.py @@ -1784,7 +1784,7 @@ def getAsyncLoggerStream(self, logname, mesg='') -> contextlib.AbstractContextMa ''' stream = AsyncStreamEvent() stream.setMesg(mesg) - handler = logging.StreamHandler(stream) + handler = s_structlog.StreamHandlerWithQueue(stream=stream) slogger = logging.getLogger(logname) slogger.addHandler(handler) level = slogger.level @@ -1829,7 +1829,7 @@ def getStructuredAsyncLoggerStream(self, logname, mesg='') -> contextlib.Abstrac ''' stream = AsyncStreamEvent() stream.setMesg(mesg) - handler = logging.StreamHandler(stream) + handler = s_structlog.StreamHandlerWithQueue(stream=stream) slogger = logging.getLogger(logname) formatter = s_structlog.JsonFormatter() handler.setFormatter(formatter)