From 0f6e05234f1338c937a82bda0fe1d799ca1571cc Mon Sep 17 00:00:00 2001 From: epiphyte Date: Wed, 18 Sep 2024 15:00:11 +0000 Subject: [PATCH 1/4] WIP - Initial swag at a queue handler that is hooked into the Cell via initFromArgv, plumb it up into storm (SYN-8012) --- synapse/common.py | 26 +++++++--- synapse/lib/cell.py | 20 ++++++++ synapse/lib/stormlib/cell.py | 42 ++++++++++++++++ synapse/lib/structlog.py | 13 +++++ synapse/tests/test_lib_stormlib_cell.py | 66 +++++++++++++++++++++++++ 5 files changed, 160 insertions(+), 7 deletions(-) diff --git a/synapse/common.py b/synapse/common.py index acb1c3b81b8..e6dd012bbad 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -776,15 +776,24 @@ def makedirs(path, mode=0o777): def iterzip(*args, fillvalue=None): return itertools.zip_longest(*args, fillvalue=fillvalue) -def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None): +def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None, qsize=None): if structlog: structlog = 'true' else: structlog = 'false' + + if qsize is None: + qsize = s_structlog.DEFAULT_QSIZE + defval = os.getenv('SYN_LOG_LEVEL', defval) datefmt = os.getenv('SYN_LOG_DATEFORMAT', datefmt) structlog = envbool('SYN_LOG_STRUCT', structlog) - ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt} + + qsize = int(os.getenv('SYN_LOG_QSIZE', qsize)) + if qsize < 0: + raise s_exc.BadArg(mesg=f'SYN_LOG_QSIZE must be >=0, got {qsize}') + + ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt, 'qsize': qsize} return ret def normLogLevel(valu): @@ -815,7 +824,7 @@ def normLogLevel(valu): return normLogLevel(valu) raise s_exc.BadArg(mesg=f'Unknown log level type: {type(valu)} {valu}', valu=valu) -def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None): +def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None, qsize=None): ''' Configure synapse logging. @@ -824,15 +833,16 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non defval (str): Default log level. May be an integer. structlog (bool): Enabled structured (jsonl) logging output. datefmt (str): Optional strftime format string. - + qsize (int): Optional queue size for the root logger. Notes: This calls logging.basicConfig and should only be called once per process. Returns: None ''' - ret = _getLogConfFromEnv(defval, structlog, datefmt) + ret = _getLogConfFromEnv(defval, structlog, datefmt, qsize) + qsize = ret.get('qsize') datefmt = ret.get('datefmt') log_level = ret.get('defval') log_struct = ret.get('structlog') @@ -841,13 +851,15 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non log_level = normLogLevel(log_level) + handler = s_structlog.StreamHandlerWithQueue(qsize=qsize) + if log_struct: - handler = logging.StreamHandler() formatter = s_structlog.JsonFormatter(datefmt=datefmt) handler.setFormatter(formatter) logging.basicConfig(level=log_level, handlers=(handler,)) else: - logging.basicConfig(level=log_level, format=s_const.LOG_FORMAT, datefmt=datefmt) + logging.basicConfig(level=log_level, handlers=(handler,), format=s_const.LOG_FORMAT, datefmt=datefmt) + if log_setup: mlogger.info('log level set to %s', s_const.LOG_LEVEL_INVERSE_CHOICES.get(log_level)) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 2e806e1ebd6..8f743376e07 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -16,6 +16,7 @@ import tempfile import functools import contextlib +import collections import multiprocessing import aiohttp @@ -844,6 +845,11 @@ async def getReloadableSystems(self): async def reload(self, subsystem=None): return await self.cell.reload(subsystem=subsystem) + @adminapi(log=True) + async def iterCellLogs(self): + async for mesg in self.cell.iterCellLogs(): + yield mesg + class Cell(s_nexus.Pusher, s_telepath.Aware): ''' A Cell() implements a synapse micro-service. @@ -1111,6 +1117,7 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): self.ahaclient = None self._checkspace = s_coro.Event() self._reloadfuncs = {} # name -> func + self._syn_log_queue = None # type: collections.deque self.nexslock = asyncio.Lock() self.netready = asyncio.Event() @@ -1462,6 +1469,8 @@ async def fini(self): retn = await s_nexus.Pusher.fini(self) if retn == 0: self._onFiniCellGuid() + if self._syn_log_queue is not None: + self._syn_log_queue = None return retn def _onFiniCellGuid(self): @@ -4203,6 +4212,11 @@ async def initFromArgv(cls, argv, outp=None): logger.exception(f'Error starting cell at {opts.dirn}') raise + for handler in logging.getLogger('').handlers: + if hasattr(handler, '_syn_log_queue'): + cell._syn_log_queue = handler._syn_log_queue + break + try: turl = cell._getDmonListen() @@ -4921,3 +4935,9 @@ def getCachedSslCtx(self, opts=None, verify=None): key = tuple(sorted(opts.items())) return self._sslctx_cache.get(key) + + async def iterCellLogs(self): + if self._syn_log_queue is None: + raise s_exc.SynErr(mesg='no queue available') + for mesg in list(self._syn_log_queue): + yield mesg diff --git a/synapse/lib/stormlib/cell.py b/synapse/lib/stormlib/cell.py index 5e450ab04c6..c6e3f2cf981 100644 --- a/synapse/lib/stormlib/cell.py +++ b/synapse/lib/stormlib/cell.py @@ -171,8 +171,22 @@ class CellLib(s_stormtypes.Lib): '(defaults to the Cortex if not provided).'}, ), 'returns': {'type': 'dict', 'desc': 'A dictionary containing uptime data.', }}}, + {'name': 'iterLogs', 'desc': 'Iterate over recent service logs.', + 'type': {'type': 'function', '_funcname': '_methIterLogs', + 'args': ( + {'name': 'name', 'type': 'str', 'default': None, + # FIXME This may need to accomdate other aha / telepath values + 'desc': 'The name, or iden, of the service to get uptime data for ' + '(defaults to the Cortex if not provided).'}, + ), + 'returns': {'name': 'Yields', 'type': 'str'}, + }} ) _storm_lib_path = ('cell',) + # FIXME populater + # _storm_lib_perms = ( + # + # ) def getObjLocals(self): return { @@ -185,6 +199,7 @@ def getObjLocals(self): 'hotFixesCheck': self._hotFixesCheck, 'trimNexsLog': self._trimNexsLog, 'uptime': self._uptime, + 'iterLogs': self._methIterLogs, } async def _hotFixesApply(self): @@ -328,3 +343,30 @@ async def _uptime(self, name=None): 'starttime': info['cellstarttime'], 'uptime': info['celluptime'], } + + @s_stormtypes.stormfunc(readonly=True) + async def _methIterLogs(self, name=None): + name = await s_stormtypes.tostr(name, noneok=True) + + # FIXME PERMS DISCUSSION + self.runt.confirm(('storm', 'lib', 'cell', 'iterlogs')) + # if not self.runt.isAdmin(): + # mesg = '$lib.cell.iterLogs() requires admin privs.' + # raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) + + if name is None: + genr = self.runt.snap.core.iterCellLogs() + else: + ssvc = self.runt.snap.core.getStormSvc(name) + if ssvc is None: + # FIXME: FALLBACK to lookup via aha or telepath? + # It sure would be nice to have an easy way to reach the logs from ANYTHING + # but that may be a later scatter/gather support item. + mesg = f'No service with name/iden: {name}' + raise s_exc.NoSuchName(mesg=mesg) + await ssvc.proxy.waitready() + genr = ssvc.proxy.iterCellLogs() + + async for mesg in genr: + yield mesg + await asyncio.sleep(0) diff --git a/synapse/lib/structlog.py b/synapse/lib/structlog.py index daa59d35e7d..1624054d345 100644 --- a/synapse/lib/structlog.py +++ b/synapse/lib/structlog.py @@ -1,9 +1,22 @@ import json import logging +import collections import synapse.common as s_common +DEFAULT_QSIZE = 1024 + +class StreamHandlerWithQueue(logging.StreamHandler): + def __init__(self, stream=None, qsize=DEFAULT_QSIZE): + super().__init__(stream=stream) + self._syn_log_queue = collections.deque(maxlen=qsize) + + def format(self, record: logging.LogRecord) -> str: + mesg = super().format(record) + self._syn_log_queue.append(mesg) + return mesg + class JsonFormatter(logging.Formatter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/synapse/tests/test_lib_stormlib_cell.py b/synapse/tests/test_lib_stormlib_cell.py index 95732aa4934..2960087cce2 100644 --- a/synapse/tests/test_lib_stormlib_cell.py +++ b/synapse/tests/test_lib_stormlib_cell.py @@ -1,8 +1,14 @@ + +import logging + +import contextlib + import synapse.exc as s_exc import synapse.lib.aha as s_aha import synapse.lib.coro as s_coro import synapse.lib.const as s_const +import synapse.lib.structlog as s_structlog import synapse.lib.stormlib.cell as s_stormlib_cell import synapse.tests.utils as s_test @@ -348,3 +354,63 @@ async def test_stormfix_riskhasvuln(self): self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view0', opts={'view': view0})) self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view1', opts={'view': view1})) self.len(1, await core.nodes('risk:vulnerable -> it:host', opts={'view': view2})) + + @contextlib.contextmanager + def getAsyncLoggerStreamAndHandler(self, logname, mesg=''): + stream = s_test.AsyncStreamEvent() + stream.setMesg(mesg) + handler = s_structlog.StreamHandlerWithQueue(stream) + slogger = logging.getLogger(logname) + slogger.addHandler(handler) + level = slogger.level + slogger.setLevel('DEBUG') + try: + yield stream, handler + except Exception: # pragma: no cover + raise + finally: + slogger.removeHandler(handler) + slogger.setLevel(level) + + @contextlib.contextmanager + def getAsyncStructLoggerStreamAndHandler(self, logname, mesg=''): + stream = s_test.AsyncStreamEvent() + stream.setMesg(mesg) + handler = s_structlog.StreamHandlerWithQueue(stream) + formatter = s_structlog.JsonFormatter() + handler.setFormatter(formatter) + slogger = logging.getLogger(logname) + slogger.addHandler(handler) + level = slogger.level + slogger.setLevel('DEBUG') + try: + yield stream, handler + except Exception: # pragma: no cover + raise + finally: + slogger.removeHandler(handler) + slogger.setLevel(level) + + async def test_stormlib_cell_iterlogs(self): + with self.getAsyncLoggerStreamAndHandler('', '') as (stream, handler): + conf = {'storm:log': True} + async with self.getTestCore(conf=conf) as core: + core._syn_log_queue = handler._syn_log_queue + msgs = await core.stormlist('$lib.print(hello)') + msgs = await core.stormlist('$lib.log.info(hello)') + msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') + self.stormIsInPrint('Executing storm query {$lib.print(hello)} ', msgs) + self.stormIsInPrint('Executing storm query {$lib.log.info(hello)}', msgs) + self.stormIsInPrint('hello', msgs) + + with self.getAsyncStructLoggerStreamAndHandler('', '') as (stream, handler): + conf = {'storm:log': True} + async with self.getTestCore(conf=conf) as core: + core._syn_log_queue = handler._syn_log_queue + msgs = await core.stormlist('$lib.print(hello)') + msgs = await core.stormlist('$lib.log.info(hello)') + msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') + + self.stormIsInPrint('"message": "Executing storm query {$lib.print(hello)}', msgs) + self.stormIsInPrint('"message": "Executing storm query {$lib.log.info(hello)}', msgs) + self.stormIsInPrint('"message": "hello"', msgs) From 35d690ea3b79fc94ccd9e0ab1b4e36ea9c49e829 Mon Sep 17 00:00:00 2001 From: epiphyte Date: Wed, 18 Sep 2024 18:14:00 +0000 Subject: [PATCH 2/4] Cleanup implementation to always grab the queue reference if we find it in the root logger. --- synapse/lib/cell.py | 12 +++--- synapse/tests/test_lib_stormlib_cell.py | 49 +------------------------ synapse/tests/utils.py | 4 +- 3 files changed, 10 insertions(+), 55 deletions(-) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 8f743376e07..f952cd9686e 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -1117,11 +1117,16 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): self.ahaclient = None self._checkspace = s_coro.Event() self._reloadfuncs = {} # name -> func - self._syn_log_queue = None # type: collections.deque self.nexslock = asyncio.Lock() self.netready = asyncio.Event() + self._syn_log_queue = None # type: collections.deque + for handler in logging.getLogger('').handlers: + if hasattr(handler, '_syn_log_queue'): + self._syn_log_queue = handler._syn_log_queue + break + self.conf = self._initCellConf(conf) self.minfree = self.conf.get('limit:disk:free') @@ -4212,11 +4217,6 @@ async def initFromArgv(cls, argv, outp=None): logger.exception(f'Error starting cell at {opts.dirn}') raise - for handler in logging.getLogger('').handlers: - if hasattr(handler, '_syn_log_queue'): - cell._syn_log_queue = handler._syn_log_queue - break - try: turl = cell._getDmonListen() diff --git a/synapse/tests/test_lib_stormlib_cell.py b/synapse/tests/test_lib_stormlib_cell.py index 2960087cce2..e886817730e 100644 --- a/synapse/tests/test_lib_stormlib_cell.py +++ b/synapse/tests/test_lib_stormlib_cell.py @@ -1,14 +1,7 @@ - -import logging - -import contextlib - import synapse.exc as s_exc -import synapse.lib.aha as s_aha import synapse.lib.coro as s_coro import synapse.lib.const as s_const -import synapse.lib.structlog as s_structlog import synapse.lib.stormlib.cell as s_stormlib_cell import synapse.tests.utils as s_test @@ -355,47 +348,10 @@ async def test_stormfix_riskhasvuln(self): self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view1', opts={'view': view1})) self.len(1, await core.nodes('risk:vulnerable -> it:host', opts={'view': view2})) - @contextlib.contextmanager - def getAsyncLoggerStreamAndHandler(self, logname, mesg=''): - stream = s_test.AsyncStreamEvent() - stream.setMesg(mesg) - handler = s_structlog.StreamHandlerWithQueue(stream) - slogger = logging.getLogger(logname) - slogger.addHandler(handler) - level = slogger.level - slogger.setLevel('DEBUG') - try: - yield stream, handler - except Exception: # pragma: no cover - raise - finally: - slogger.removeHandler(handler) - slogger.setLevel(level) - - @contextlib.contextmanager - def getAsyncStructLoggerStreamAndHandler(self, logname, mesg=''): - stream = s_test.AsyncStreamEvent() - stream.setMesg(mesg) - handler = s_structlog.StreamHandlerWithQueue(stream) - formatter = s_structlog.JsonFormatter() - handler.setFormatter(formatter) - slogger = logging.getLogger(logname) - slogger.addHandler(handler) - level = slogger.level - slogger.setLevel('DEBUG') - try: - yield stream, handler - except Exception: # pragma: no cover - raise - finally: - slogger.removeHandler(handler) - slogger.setLevel(level) - async def test_stormlib_cell_iterlogs(self): - with self.getAsyncLoggerStreamAndHandler('', '') as (stream, handler): + with self.getAsyncLoggerStream('', '') as stream: conf = {'storm:log': True} async with self.getTestCore(conf=conf) as core: - core._syn_log_queue = handler._syn_log_queue msgs = await core.stormlist('$lib.print(hello)') msgs = await core.stormlist('$lib.log.info(hello)') msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') @@ -403,10 +359,9 @@ async def test_stormlib_cell_iterlogs(self): self.stormIsInPrint('Executing storm query {$lib.log.info(hello)}', msgs) self.stormIsInPrint('hello', msgs) - with self.getAsyncStructLoggerStreamAndHandler('', '') as (stream, handler): + with self.getStructuredAsyncLoggerStream('', '') as stream: conf = {'storm:log': True} async with self.getTestCore(conf=conf) as core: - core._syn_log_queue = handler._syn_log_queue msgs = await core.stormlist('$lib.print(hello)') msgs = await core.stormlist('$lib.log.info(hello)') msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }') diff --git a/synapse/tests/utils.py b/synapse/tests/utils.py index 1915fb9ea2b..3ab49f558f1 100644 --- a/synapse/tests/utils.py +++ b/synapse/tests/utils.py @@ -1722,7 +1722,7 @@ def getAsyncLoggerStream(self, logname, mesg=''): ''' stream = AsyncStreamEvent() stream.setMesg(mesg) - handler = logging.StreamHandler(stream) + handler = s_structlog.StreamHandlerWithQueue(stream=stream) slogger = logging.getLogger(logname) slogger.addHandler(handler) level = slogger.level @@ -1769,7 +1769,7 @@ def getStructuredAsyncLoggerStream(self, logname, mesg=''): ''' stream = AsyncStreamEvent() stream.setMesg(mesg) - handler = logging.StreamHandler(stream) + handler = s_structlog.StreamHandlerWithQueue(stream=stream) slogger = logging.getLogger(logname) formatter = s_structlog.JsonFormatter() handler.setFormatter(formatter) From a36e209343872c0b0adfa04ded617cb407d864c7 Mon Sep 17 00:00:00 2001 From: epiphyte Date: Thu, 19 Sep 2024 15:15:38 +0000 Subject: [PATCH 3/4] comment --- synapse/lib/stormlib/cell.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/lib/stormlib/cell.py b/synapse/lib/stormlib/cell.py index c6e3f2cf981..baf17d69d86 100644 --- a/synapse/lib/stormlib/cell.py +++ b/synapse/lib/stormlib/cell.py @@ -179,11 +179,16 @@ class CellLib(s_stormtypes.Lib): 'desc': 'The name, or iden, of the service to get uptime data for ' '(defaults to the Cortex if not provided).'}, ), + # FIXME DISCUSS + # The real q is do we want the Telepath/Storm API endpoints to yield strings + # OR do we want to yield struct log messages as dictionaries ? + # Strings are EASY to grab and capture; grabbing the structlog data is + # more complicated. 'returns': {'name': 'Yields', 'type': 'str'}, }} ) _storm_lib_path = ('cell',) - # FIXME populater + # FIXME populate # _storm_lib_perms = ( # # ) From 2e808cfefde230c5abe753945bc53f8727660ba4 Mon Sep 17 00:00:00 2001 From: epiphyte Date: Fri, 20 Sep 2024 13:38:38 +0000 Subject: [PATCH 4/4] Call format directly --- synapse/lib/structlog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/lib/structlog.py b/synapse/lib/structlog.py index 1624054d345..1ace1c5506c 100644 --- a/synapse/lib/structlog.py +++ b/synapse/lib/structlog.py @@ -13,7 +13,7 @@ def __init__(self, stream=None, qsize=DEFAULT_QSIZE): self._syn_log_queue = collections.deque(maxlen=qsize) def format(self, record: logging.LogRecord) -> str: - mesg = super().format(record) + mesg = self.format(record) self._syn_log_queue.append(mesg) return mesg