diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index adcd1dc5b76..23c187aa296 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -10,12 +10,14 @@ import asyncio import logging import tarfile +import weakref import argparse import datetime import platform import tempfile import functools import contextlib +import collections import multiprocessing import aiohttp @@ -55,6 +57,7 @@ import synapse.lib.version as s_version import synapse.lib.lmdbslab as s_lmdbslab import synapse.lib.thisplat as s_thisplat +import synapse.lib.structlog as s_structlog import synapse.lib.crypto.passwd as s_passwd @@ -192,6 +195,29 @@ def _iterBackupProc(path, linkinfo): logger.info(f'Backup streaming process for [{path}] starting.') asyncio.run(_iterBackupWork(path, linkinfo)) +class LogHandler(logging.Handler): + + def __init__(self, cell): + logging.Handler.__init__(self) + self.cell = cell + + # TODO This needs some help untangling the multiple ways to + # specify log levels to the Cell... + + logconf = cell.conf.get('_log_conf', {}) + # how did "defval" become the loglevel key? + loglevel = logconf.get('defval') + if loglevel is None: + loglevel = logging.WARNING + + self.setLevel(loglevel) + self.setFormatter(s_structlog.JsonFormatter()) + + def emit(self, record): + # async needed to append to windows, but firing a task + # seems like overkill... + self.cell.addLogRec(self.format(record)) + class CellApi(s_base.Base): async def __anit__(self, cell, link, user): @@ -890,6 +916,10 @@ async def getReloadableSystems(self): async def reload(self, subsystem=None): return await self.cell.reload(subsystem=subsystem) + @adminapi(log=True) + async def getLogs(self, size=100): + return self.cell.getLogs(size=size) + class Cell(s_nexus.Pusher, s_telepath.Aware): ''' A Cell() implements a synapse micro-service. @@ -1238,6 +1268,7 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): await s_nexus.Pusher.__anit__(self, self.iden) + self._initLogs() self._initCertDir() await self.enter_context(s_telepath.loadTeleCell(self.dirn)) @@ -1473,6 +1504,57 @@ async def _storCellAuthMigration(self): logger.warning(f'...Cell ({self.getCellType()}) auth migration complete!') + def _initLogs(self): + # setup cell log capture + self._log_todo = [] + self._log_fifo = collections.deque(maxlen=10000) + self._log_fill = asyncio.Event() + self._log_windows = weakref.WeakSet() + + handler = LogHandler(self) + logging.getLogger().addHandler(handler) + def _finiLogs(): + self._log_fill.set() + logging.getLogger().removeHandler(handler) + + self.onfini(_finiLogs) + self.schedCoro(self._logWindowFiller()) + + # TODO: add peers=True and use AHA gather + def getLogs(self, size=100): + return self._log_fifo[-size:] + + # TODO: add peers=True and use AHA gather and mergesort + async def getLogTail(self): + + async with await s_queue.Window.anit(maxsize=1000) as wind: + self._log_windows.add(wind) + + for logrec in wind: + yield logrec + + def addLogRec(self, logrec): + self._log_fifo.append(logrec) + self._log_todo.append(logrec) + self._log_fill.set() + + async def _logWindowFiller(self): + + while not self.isfini: + + await self._log_fill.wait() + if self.isfini: + break + + todo = list(self._log_todo) + + self._log_todo.clear() + self._log_fill.clear() + + if todo: + for window in self._log_windows: + await window.puts(todo) + def getPermDef(self, perm): perm = tuple(perm) if self.permlook is None: