diff --git a/.circleci/config.yml b/.circleci/config.yml index 551442a8c74..8fb97177719 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -626,6 +626,7 @@ workflows: branches: only: - master + - visi-logging-epiphyte-fb - build_docker_tag: requires: diff --git a/changes/7ef2889edb0c2bfb214207b97f0394db.yaml b/changes/7ef2889edb0c2bfb214207b97f0394db.yaml new file mode 100644 index 00000000000..7d9522d6a4d --- /dev/null +++ b/changes/7ef2889edb0c2bfb214207b97f0394db.yaml @@ -0,0 +1,7 @@ +--- +desc: Deprecated ``synapse.lib.structlog.JsonFormatter``. It will be removed after + April 1st, 2026. +desc:literal: false +prs: [] +type: deprecation +... diff --git a/docs/synapse/devopsguide.rst b/docs/synapse/devopsguide.rst index b925847d721..8096d2d6de8 100644 --- a/docs/synapse/devopsguide.rst +++ b/docs/synapse/devopsguide.rst @@ -346,46 +346,72 @@ These structured logs are designed to be easy to ingest into third party log col message, level, time, and metadata about where the log message came from:: { - "message": "log level set to INFO", + "ahaservice": "00.cortex.synapse", + "message": "Executing storm query {[inet:asn=1234]} as [someUsername]", "logger": { - "name": "synapse.lib.cell", + "name": "synapse.storm", + "func": "_logStormQuery", "process": "MainProcess", - "filename": "common.py", - "func": "setlogging" + "thread": "MainThread" }, "level": "INFO", - "time": "2021-06-28 15:47:54,825" + "time": "2026-02-13 10:38:24,545", + "user": "e3532bc88fa66afb592e6a1474a98675", + "username": "someUsername", + "params": { # Param + "mode": "storm", + "view": "715a5c9ae37e4045795ea3f3cabb44fb", + "text": "[inet:asn=1234]", + "hash": "ef94e89eb3bc309a40242876f6c5f296" + } } +The ``user`` and ``username`` fields at the top level of a log correspond to the currently active / authorized API user +which has caused a log event to occur. The ``service`` key, if present, indicates the AHA service that is associated +with the log message. + When exceptions are logged with structured logging, we capture additional information about the exception, including the -entire traceback. In the event that the error is a Synapse Err class, we also capture additional metadata which was -attached to the error. In the following example, we also have the query text, username and user iden available in the -log message pretty-printed log message:: +traceback as structured data. In the event that the error is a Synapse Err class, we also capture additional metadata +which was attached to the error. In the following example, we also have the query text, username and user iden available +in the pretty-printed log message. The ``key=valu`` data that was raised by the user is also included in the +``error.info.`` dictionary. Cause and Context information may be present, corresponding to the Python ``__cause__`` and +``__context__`` attributes on Exception classes.:: { - "message": "Error during storm execution for { || }", + "ahaservice": "00.cortex.synapse", + "message": "Error during storm execution for { $lib.raise(Newp, 'ruh roh', key=valu) }", "logger": { "name": "synapse.lib.view", + "func": "runStorm", "process": "MainProcess", - "filename": "view.py", - "func": "runStorm" + "thread": "MainThread" }, "level": "ERROR", - "time": "2021-06-28 15:49:34,401", - "err": { - "efile": "coro.py", - "eline": 233, - "esrc": "return await asyncio.get_running_loop().run_in_executor(forkpool, _runtodo, todo)", - "ename": "forked", - "at": 1, - "text": "||", - "mesg": "No terminal defined for '|' at line 1 col 2. Expecting one of: #, $, (, *, + or -, -(, -+>, -->, ->, :, <(, <+-, <-, <--, [, break, command name, continue, fini, for, function, if, init, property name, return, switch, while, whitespace or comment, yield, {", - "etb": ".... long traceback ...", - "errname": "BadSyntax" + "time": "2026-02-13 11:24:06,853", + "user": "e3532bc88fa66afb592e6a1474a98675", + "username": "someUsername" + "error": { + "code": "StormRaise", + "traceback": [ + ... list of traceback frames ... + [ + "/home/epiphyte/PycharmProjects/synapse/synapse/lib/stormtypes.py", + 1751, + "_raise", + "raise s_exc.StormRaise(**info)" + ] + ], + "info": { + "key": "valu", + "errname": "Newp", + ... additional error information ... + } + }, + "mesg": "ruh roh" }, - "text": "||", - "username": "root", - "user": "3189065f95d3ab0a6904e604260c0be2" + "params": { + "text": "$lib.raise(Newp, 'ruh roh', key=valu)", + } } Custom date formatting strings can also be provided by setting the ``SYN_LOG_DATEFORMAT`` string. This is expected to be a diff --git a/synapse/common.py b/synapse/common.py index ff6b5f1a0e4..9bad0bdac02 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -41,7 +41,6 @@ import synapse.lib.const as s_const import synapse.lib.logging as s_logging import synapse.lib.msgpack as s_msgpack -import synapse.lib.structlog as s_structlog import synapse.vendor.cpython.lib.ipaddress as ipaddress import synapse.vendor.cpython.lib.http.cookies as v_cookies diff --git a/synapse/cortex.py b/synapse/cortex.py index 0bf9769fb6b..155115672b7 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -1058,7 +1058,7 @@ async def initServiceStorage(self): mesg = f'User {useriden} ({user.name}) has a rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' - logger.warning(mesg, extra=self.getLogExtra(user=useriden, username=user.name)) + logger.warning(mesg) for roleiden in ag.gateroles.keys(): role = self.auth.role(roleiden) if role is None: @@ -1066,7 +1066,7 @@ async def initServiceStorage(self): mesg = f'Role {roleiden} ({role.name}) has a rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' - logger.warning(mesg, extra=self.getLogExtra(role=roleiden, rolename=role.name)) + logger.warning(mesg) self._initVaults() @@ -6449,8 +6449,7 @@ async def _getStormEval(self, text): try: astvalu = copy.deepcopy(await s_parser.evalcache.aget(text)) except s_exc.FatalErr: # pragma: no cover - extra = self.getLogExtra(text=text) - logger.exception(f'Fatal error while parsing [{text}]', extra=extra) + logger.exception(f'Fatal error while parsing [{text}]', extra=self.getLogExtra(text=text)) await self.fini() raise astvalu.init(self) @@ -6460,8 +6459,7 @@ async def _getStormQuery(self, args): try: query = copy.deepcopy(await s_parser.querycache.aget(args)) except s_exc.FatalErr: # pragma: no cover - extra = self.getLogExtra(text=args[0]) - logger.exception(f'Fatal error while parsing [{args}]', extra=extra) + logger.exception(f'Fatal error while parsing [{args}]', extra=self.getLogExtra(text=args[0])) await self.fini() raise query.init(self) @@ -6511,12 +6509,9 @@ def _logStormQuery(self, text, user, info=None): if info is None: info = {} info['text'] = text - info['username'] = user.name - info['user'] = user.iden info['hash'] = s_storm.queryhash(text) - extra = s_logging.getLogExtra(**info) stormlogger.log(self.stormloglvl, 'Executing storm query {%s} as [%s]', text, user.name, - extra=extra) + extra=self.getLogExtra(**info)) async def getNodeByNdef(self, ndef, view=None): ''' diff --git a/synapse/datamodel.py b/synapse/datamodel.py index 741af1d580e..794ded1dde3 100644 --- a/synapse/datamodel.py +++ b/synapse/datamodel.py @@ -16,6 +16,7 @@ import synapse.lib.types as s_types import synapse.lib.dyndeps as s_dyndeps import synapse.lib.grammar as s_grammar +import synapse.lib.logging as s_logging import synapse.lib.msgpack as s_msgpack logger = logging.getLogger(__name__) @@ -948,7 +949,6 @@ def addType(self, typename, basename, typeopts, typeinfo, checks=True): def _checkTypeDef(self, typ): if 'comp' in typ.info.get('bases', ()): for fname, ftypename in typ.opts.get('fields', ()): - extra = {'synapse': {'type': typ.name, 'field': fname}} if isinstance(ftypename, (list, tuple)): ftypename = ftypename[0] @@ -957,7 +957,7 @@ def _checkTypeDef(self, typ): ftype = typ.tcache[fname] except s_exc.BadTypeDef: mesg = f'The {typ.name} field {fname} is declared as a type ({ftypename}) that does not exist.' - logger.warning(mesg, extra=extra) + logger.warning(mesg, s_logging.getLogExtra(type=typ.name, field=fname)) continue # We're only interested in extended model comp types @@ -966,12 +966,11 @@ def _checkTypeDef(self, typ): if ftype.ismutable: mesg = f'Comp types with mutable fields ({typ.name}:{fname}) are deprecated and will be removed in 3.0.0.' - logger.warning(mesg, extra=extra) + logger.warning(mesg, s_logging.getLogExtra(type=typ.name, field=fname)) if ftype.deprecated: mesg = f'The type {typ.name} field {fname} uses a deprecated type {ftype.name}.' - extra['synapse']['field:type'] = ftype.name - logger.warning(mesg, extra=extra) + logger.warning(mesg, s_logging.getLogExtra(type=typ.name, field=fname, field_type=ftype.name)) def addForm(self, formname, forminfo, propdefs, checks=True): diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 9f239832dea..b4c513900d6 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -14,6 +14,7 @@ import synapse.lib.base as s_base import synapse.lib.coro as s_coro +import synapse.lib.scope as s_scope import synapse.lib.logging as s_logging # Agenda: manages running one-shot and periodic tasks in the future ("appointments") @@ -832,22 +833,18 @@ async def runloop(self): if appt.isrunning: # pragma: no cover mesg = f'Appointment {appt.iden} {appt.name} is still running from previous time when scheduled' \ f' to run. Skipping.' - logger.warning(mesg, - extra={'synapse': {'iden': appt.iden, 'name': appt.name}}) + logger.warning(mesg, extra=self.core.getLogExtra(iden=appt.iden, name=appt.name)) else: try: await self._execute(appt) except Exception as e: - extra = {'iden': appt.iden, 'name': appt.name, 'user': appt.creator, 'view': appt.view} - user = self.core.auth.user(appt.creator) - if user is not None: - extra['username'] = user.name + extra = {'iden': appt.iden, 'name': appt.name, 'view': appt.view} if isinstance(e, s_exc.SynErr): mesg = e.get('mesg', str(e)) else: # pragma: no cover mesg = str(e) logger.exception(f'Agenda error running appointment {appt.iden} {appt.name}: {mesg}', - extra={'synapse': extra}) + extra=self.core.getLogExtra(**extra)) await self._markfailed(appt, f'error: {e}') async def _execute(self, appt): @@ -857,23 +854,21 @@ async def _execute(self, appt): user = self.core.auth.user(appt.creator) if user is None: logger.warning(f'Unknown user {appt.creator} in stored appointment {appt.iden} {appt.name}', - extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': appt.creator}}) + extra=self.core.getLogExtra(iden=appt.iden, name=appt.name, user=appt.creator)) await self._markfailed(appt, 'unknown user') return locked = user.info.get('locked') if locked: logger.warning(f'Cron {appt.iden} {appt.name} failed because creator {user.name} is locked', - extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': appt.creator, - 'username': user.name}}) + extra=self.core.getLogExtra(iden=appt.iden, name=appt.name)) await self._markfailed(appt, 'locked user') return view = self.core.getView(iden=appt.view, user=user) if view is None: logger.warning(f'Unknown view {appt.view} in stored appointment {appt.iden} {appt.name}', - extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': appt.creator, - 'username': user.name, 'view': appt.view}}) + extra=self.core.getLogExtra(iden=appt.iden, name=appt.name, view=appt.view)) await self._markfailed(appt, 'unknown view') return @@ -881,6 +876,7 @@ async def _execute(self, appt): coro = self._runJob(user, appt) task = self.core.runActiveTask(coro) + task._syn_scope['user'] = user appt.task = await self.core.boss.promotetask(task, f'Cron {appt.iden}', user, info=info) async def fini(): @@ -912,8 +908,7 @@ async def _runJob(self, user, appt): await self.core.addCronEdits(appt.iden, edits) logger.info(f'Agenda executing for iden={appt.iden}, name={appt.name} user={user.name}, view={appt.view}, query={appt.query}', - extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': user.iden, 'text': appt.query, - 'username': user.name, 'view': appt.view}}) + extra=self.core.getLogExtra(iden=appt.iden, name=appt.name, text=appt.query, view=appt.iden)) starttime = self._getNowTick() success = False @@ -940,8 +935,10 @@ async def _runJob(self, user, appt): elif mesg[0] == 'warn' and loglevel <= logging.WARNING: text = mesg[1].get('mesg', '') - extra = self.core.getLogExtra(cron=appt.iden, **mesg[1]) - logger.warning(f'Cron job {appt.iden} issued warning: {text}', extra=extra) + _params = mesg[1] + _params['iden'] = appt.iden + logger.warning(f'Cron job {appt.iden} issued warning: {text}', + extra=self.core.getLogExtra(**_params)) elif mesg[0] == 'err': excname, errinfo = mesg[1] @@ -957,7 +954,7 @@ async def _runJob(self, user, appt): except Exception as e: result = f'raised exception {e}' logger.exception(f'Agenda job {appt.iden} {appt.name} raised exception', - extra={'synapse': {'iden': appt.iden, 'name': appt.name}} + extra=self.core.getLogExtra(iden=appt.iden, name=appt.name), ) else: success = True @@ -981,8 +978,7 @@ async def _runJob(self, user, appt): f'took {took:.3f}s' if not self.core.isactive: mesg = mesg + ' Agenda status will not be saved since the Cortex is no longer the leader.' - logger.info(mesg, extra={'synapse': {'iden': appt.iden, 'name': appt.name, 'user': user.iden, - 'result': result, 'username': user.name, 'took': took}}) + logger.info(mesg, extra=self.core.getLogExtra(iden=appt.iden, name=appt.name, result=result, took=took)) edits = { 'lastfinishtime': finishtime, 'isrunning': False, diff --git a/synapse/lib/auth.py b/synapse/lib/auth.py index 86f8ef411d1..646db6d52de 100644 --- a/synapse/lib/auth.py +++ b/synapse/lib/auth.py @@ -1458,8 +1458,7 @@ async def tryPasswd(self, passwd, nexs=True, enforce_policy=True): if expires >= s_common.now(): if await s_passwd.checkShadowV2(passwd=passwd, shadow=shadow): await self.auth.setUserInfo(self.iden, 'onepass', None) - logger.debug(f'Used one time password for {self.name}', - extra={'synapse': {'user': self.iden, 'username': self.name}}) + logger.debug(f'Used one time password for {self.name}') return True else: # Backwards compatible password handling @@ -1467,8 +1466,7 @@ async def tryPasswd(self, passwd, nexs=True, enforce_policy=True): if expires >= s_common.now(): if s_common.guid((params, passwd)) == hashed: await self.auth.setUserInfo(self.iden, 'onepass', None) - logger.debug(f'Used one time password for {self.name}', - extra={'synapse': {'user': self.iden, 'username': self.name}}) + logger.debug(f'Used one time password for {self.name}') return True shadow = self.info.get('passwd') @@ -1493,15 +1491,15 @@ async def tryPasswd(self, passwd, nexs=True, enforce_policy=True): if self.iden == self.auth.rootuser.iden: mesg = f'User {self.name} has exceeded the number of allowed password attempts ({valu + 1}),. Cannot lock {self.name} user.' - extra = {'synapse': {'target_user': self.iden, 'target_username': self.name, }} - logger.error(mesg, extra=extra) + logger.error(mesg, extra=self.auth.nexsroot.cell.getLogExtra(target_user=self.iden, target_username=self.name)) return False await self.auth.nexsroot.cell.setUserLocked(self.iden, True) - mesg = f'User {self.name} has exceeded the number of allowed password attempts ({valu + 1}), locking their account.' - extra = {'synapse': {'target_user': self.iden, 'target_username': self.name, 'status': 'MODIFY'}} - logger.warning(mesg, extra=extra) + mesg = f'User {self.name} has exceeded the number of allowed password attempts ({valu + 1}), locked their account.' + logger.warning(mesg, extra=self.auth.nexsroot.cell.getLogExtra(target_user=self.iden, + target_username=self.name, + status='MODIFY')) return False @@ -1510,8 +1508,7 @@ async def tryPasswd(self, passwd, nexs=True, enforce_policy=True): # Backwards compatible password handling salt, hashed = shadow if s_common.guid((salt, passwd)) == hashed: - logger.debug(f'Migrating password to shadowv2 format for user {self.name}', - extra={'synapse': {'user': self.iden, 'username': self.name}}) + logger.debug(f'Migrating password to shadowv2 format for user {self.name}') # Update user to new password hashing scheme. We cannot enforce policy # when migrating an existing password. await self.setPasswd(passwd=passwd, nexs=nexs, enforce_policy=False) diff --git a/synapse/lib/base.py b/synapse/lib/base.py index 0dd73850ab9..8f682409f9d 100644 --- a/synapse/lib/base.py +++ b/synapse/lib/base.py @@ -18,6 +18,7 @@ import synapse.lib.coro as s_coro import synapse.lib.scope as s_scope +import synapse.lib.logging as s_logging logger = logging.getLogger(__name__) @@ -599,6 +600,9 @@ async def main(self, timeout=BASE_MAIN_BG_TASK_TIMEOUT): # pragma: no cover ''' await self.addSignalHandlers() await self.waitfini() + # shutdown logging to allow it to drain any queued messages it has, swapping in a stream handler, + # and then cancellling the task so we do not have to await the pump task in bg tasks. + await s_logging.shutdown() await s_coro.await_bg_tasks(timeout) def waiter(self, count, *names, timeout=None): diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index aa067fb14b7..c7d268af177 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -107,7 +107,7 @@ def wrapped(self, *args, **kwargs): user=self.user.iden, username=self.user.name) if log: extra = s_logging.getLogExtra(func=func.__qualname__, args=args, kwargs=kwargs) - logger.info(f'Admin API invoked', extra=extra) + logger.info(f'Admin API invoked api={func.__qualname__}', extra=extra) return func(self, *args, **kwargs) @@ -228,6 +228,7 @@ async def logs(self, last=100): async def watch(self, last=100): async for item in self.cell.watch(last=last): yield item + await asyncio.sleep(0) async def allowed(self, perm, default=None): ''' @@ -3504,16 +3505,16 @@ def _log_web_request(self, handler: s_httpapi.Handler) -> None: if headers: enfo['headers'] = headers + extra = s_logging.getLogExtra(**enfo) + # It is possible that a Cell implementor may register handlers which # do not derive from our Handler class, so we have to handle that. if hasattr(handler, 'web_useriden') and handler.web_useriden: user = handler.web_useriden - enfo['user'] = user + extra['loginfo'].setdefault('user', user) if hasattr(handler, 'web_username') and handler.web_username: username = handler.web_username - enfo['username'] = username - - extra = s_logging.getLogExtra(**enfo) + extra['loginfo'].setdefault('username', username) if user: mesg = f'{status} {handler.request.method} {uri} ({remote_ip}) user={user} ({username}) {request_time:.2f}ms' @@ -4576,8 +4577,9 @@ async def _getCellUser(self, link, mesg): # passwd None always fails... passwd = info.get('passwd') - if not await user.tryPasswd(passwd): - raise s_exc.AuthDeny(mesg='Invalid password', username=user.name, user=user.iden) + with s_scope.enter({'user': user}): + if not await user.tryPasswd(passwd): + raise s_exc.AuthDeny(mesg='Invalid password', username=user.name, user=user.iden) return user diff --git a/synapse/lib/cmd.py b/synapse/lib/cmd.py index 265bfd566d9..e72f686c1e2 100644 --- a/synapse/lib/cmd.py +++ b/synapse/lib/cmd.py @@ -6,6 +6,7 @@ import synapse.lib.coro as s_coro import synapse.lib.output as s_output +import synapse.lib.logging as s_logging class Parser(argparse.ArgumentParser): ''' @@ -48,8 +49,6 @@ async def wrapmain(func, logconf=None): # pragma: no cover if logconf is None: logconf = {'level': 'DEBUG'} - import synapse.lib.logging as s_logging - s_logging.setup(**logconf) try: @@ -63,6 +62,7 @@ async def wrapmain(func, logconf=None): # pragma: no cover return 1 finally: + await s_logging.shutdown() await s_coro.await_bg_tasks(timeout=10) def exitmain(func, logconf=None): # pragma: no cover diff --git a/synapse/lib/coro.py b/synapse/lib/coro.py index b9d013283d7..edfec3ba2a0 100644 --- a/synapse/lib/coro.py +++ b/synapse/lib/coro.py @@ -12,8 +12,6 @@ import synapse.glob as s_glob import synapse.common as s_common -import synapse.lib.logging as s_logging - def iscoro(item): return inspect.iscoroutine(item) diff --git a/synapse/lib/httpapi.py b/synapse/lib/httpapi.py index e24017705da..5b2186dfaa2 100644 --- a/synapse/lib/httpapi.py +++ b/synapse/lib/httpapi.py @@ -214,19 +214,20 @@ def logAuthIssue(self, mesg=None, user=None, username=None, level=logging.WARNIN ''' uri = self.request.uri remote_ip = self.request.remote_ip - enfo = {'uri': uri, - 'remoteip': remote_ip, - } + erfo = { + 'uri': uri, + 'remote_ip': remote_ip, + } errm = f'Failed to authenticate request to {uri} from {remote_ip} ' if mesg: errm = f'{errm}: {mesg}' if user: errm = f'{errm}: user={user}' - enfo['user'] = user + erfo['target_user'] = user if username: errm = f'{errm} ({username})' - enfo['username'] = username - logger.log(level, msg=errm, extra=s_logging.getLogExtra(**enfo)) + erfo['target_username'] = username + logger.log(level, msg=errm, extra=s_logging.getLogExtra(**erfo)) def sendAuthRequired(self): self.set_header('WWW-Authenticate', 'Basic realm=synapse') diff --git a/synapse/lib/json.py b/synapse/lib/json.py index 6c267cc0ff7..64b1c723ef6 100644 --- a/synapse/lib/json.py +++ b/synapse/lib/json.py @@ -10,6 +10,7 @@ import yyjson import synapse.exc as s_exc +import synapse.lib.logging as s_logging logger = logging.getLogger(__name__) @@ -44,8 +45,8 @@ def loads(s: str | bytes) -> Any: return yyjson.Document(s, flags=yyjson.ReaderFlags.BIGNUM_AS_RAW).as_obj except (ValueError, TypeError) as exc: - extra = {'synapse': {'fn': 'loads', 'reason': str(exc)}} - logger.warning('Using fallback JSON deserialization. Please report this to Vertex.', extra=extra) + logger.warning('Using fallback JSON deserialization. Please report this to Vertex.', + extra=s_logging.getLogExtra(fn='loads', reason=(str(exc)))) return _fallback_loads(s) def load(fp: BinaryIO) -> Any: @@ -125,8 +126,8 @@ def dumps(obj: Any, sort_keys: bool = False, indent: bool = False, default: Opti try: return _dumps(obj, sort_keys=sort_keys, indent=indent, default=default, newline=newline) except UnicodeEncodeError as exc: - extra = {'synapse': {'fn': 'dumps', 'reason': str(exc)}} - logger.warning('Using fallback JSON serialization. Please report this to Vertex.', extra=extra) + logger.warning('Using fallback JSON serialization. Please report this to Vertex.', + extra=s_logging.getLogExtra(fn='dumps', reason=(str(exc)))) ret = _fallback_dumps(obj, sort_keys=sort_keys, indent=indent, default=default) diff --git a/synapse/lib/logging.py b/synapse/lib/logging.py index e4ef475705d..1aae11b2d45 100644 --- a/synapse/lib/logging.py +++ b/synapse/lib/logging.py @@ -7,18 +7,19 @@ import collections import synapse.exc as s_exc -import synapse.common as s_common import synapse.lib.coro as s_coro import synapse.lib.json as s_json import synapse.lib.const as s_const import synapse.lib.scope as s_scope -import synapse.lib.version as s_version logger = logging.getLogger(__name__) _log_wins = weakref.WeakSet() +LOG_PUMP_TASK_TIMEOUT = 1 +LOG_QUEUE_SIZES = 1000 + def excinfo(e, _seen=None): if _seen is None: @@ -28,6 +29,9 @@ def excinfo(e, _seen=None): tb = [] for path, line, func, sorc in traceback.extract_tb(e.__traceback__): + # sorc may not be available; ensure that all output is a str + if sorc is None: + sorc = '' tb.append((path, line, func, sorc)) ret = { @@ -50,6 +54,9 @@ def excinfo(e, _seen=None): ret['info'] = e.errinfo.copy() ret['mesg'] = ret['info'].pop('mesg', None) + if isinstance(e, BaseExceptionGroup) and e.exceptions: + ret['group'] = [excinfo(exc) for exc in e.exceptions] + if ret.get('mesg') is None: ret['mesg'] = str(e) @@ -62,10 +69,19 @@ def setLogInfo(name, valu): ''' _glob_loginfo[name] = valu +def popLogInfo(name): + ''' + Remove a global value from being added to every log. + ''' + _glob_loginfo.pop(name, None) + def getLogExtra(**kwargs): ''' Construct a properly enveloped log extra dictionary. ''' + # TODO Remove these asserts before merging! + assert 'user' not in kwargs + assert 'username' not in kwargs extra = {'params': kwargs, 'loginfo': {}} return extra @@ -80,6 +96,8 @@ def genLogInfo(self, record): 'logger': { 'name': record.name, 'func': record.funcName, + 'process': record.processName, + 'thread': record.threadName, }, 'level': record.levelname, 'time': self.formatTime(record, self.datefmt), @@ -95,7 +113,6 @@ def genLogInfo(self, record): loginfo['username'] = user.name elif (sess := s_scope.get('sess')) is not None: - loginfo['sess'] = sess.iden if sess.user is not None: loginfo['user'] = sess.user.iden loginfo['username'] = sess.user.name @@ -133,11 +150,12 @@ class StreamHandler(logging.StreamHandler): _pump_task = None _pump_event = None + _pump_exit_flag = False _glob_handler = None - _logs_fifo = collections.deque(maxlen=1000) - _logs_todo = collections.deque(maxlen=1000) - _text_todo = collections.deque(maxlen=1000) + _logs_fifo = collections.deque(maxlen=LOG_QUEUE_SIZES) + _logs_todo = collections.deque(maxlen=LOG_QUEUE_SIZES) + _text_todo = collections.deque(maxlen=LOG_QUEUE_SIZES) def emit(self, record): @@ -172,7 +190,9 @@ async def _pumpLogStream(): if not logstodo and not texttodo: StreamHandler._pump_event.clear() - continue + if StreamHandler._pump_exit_flag is True: + return + continue # pragma: no cover StreamHandler._logs_todo.clear() StreamHandler._text_todo.clear() @@ -182,11 +202,18 @@ async def _pumpLogStream(): for wind in _log_wins: await wind.puts(logstodo) - + # Don't hold onto refs of the Window objects inside of this function after we have used them. + # If we don't clear this ref, then we will hold a reference to the window object longer than needed. + # This can lead to the last window object never being GC'd while the pumpLogStream task is running, + # even after its caller has exited the watch() function. + wind = None # NOQA await s_coro.executor(_writestderr, fulltext) - except Exception as e: - traceback.print_exc() + if StreamHandler._pump_exit_flag is True and len(StreamHandler._logs_todo) == 0 and len(StreamHandler._text_todo) == 0: + return + + except Exception: + _writestderr('Error during log handling:\n' + traceback.format_exc()) def logs(last=100): return tuple(StreamHandler._logs_fifo)[-last:] @@ -249,7 +276,7 @@ def setup(**conf): return conf -def reset(): +def reset(clear_globconf=True): # This may be called by tests to cleanup loop specific objects # ( it does not need to be called by in general by service fini ) @@ -262,13 +289,51 @@ def reset(): StreamHandler._pump_task = None StreamHandler._pump_event = None + StreamHandler._pump_exit_flag = False StreamHandler._glob_handler = None StreamHandler._text_todo.clear() StreamHandler._logs_fifo.clear() StreamHandler._logs_todo.clear() - _glob_logconf.clear() - _glob_loginfo.clear() + if clear_globconf: + _glob_logconf.clear() + _glob_loginfo.clear() + +async def _shutdown_task(): + # Give the pump task a small opportunity to drain its + # queue of items and exit cleanly. + if StreamHandler._pump_task is not None: + StreamHandler._pump_exit_flag = True # Set the task to exit + StreamHandler._pump_event.set() # Wake the task + try: + await asyncio.wait_for(StreamHandler._pump_task, timeout=LOG_PUMP_TASK_TIMEOUT) + except asyncio.TimeoutError: # pragma: no cover + pass + + +async def shutdown(): # pragma: no cover + ''' + Inverse of setup. Gives the pump task the opportunity to exit + before removing it and resetting log attributes. A StreamHandler + is then re-installed on the root logger to allow for messages + from sources like atexit handlers to be logged. + + This should be called at service or tool teardown. + ''' + await _shutdown_task() + + # Reset all logging configs except globals since we may need those. + reset(clear_globconf=False) + + fmtclass = JsonFormatter + if not _glob_logconf.get('structlog'): + fmtclass = TextFormatter + + # Reinstall a StreamHandler and the formatter on the root logger + rootlogger = logging.getLogger() + stream = logging.StreamHandler() + stream.setFormatter(fmtclass(datefmt=_glob_logconf.get('datefmt'))) + rootlogger.addHandler(stream) def getLogConf(): logconf = _glob_logconf.copy() diff --git a/synapse/lib/modelrev.py b/synapse/lib/modelrev.py index 8d35ebd9cbf..1e3f6dc4b35 100644 --- a/synapse/lib/modelrev.py +++ b/synapse/lib/modelrev.py @@ -932,7 +932,7 @@ async def save(): oldm = e.errinfo.get('mesg') iden = s_common.ehex(buid) logger.warning(f'error re-norming {prop.form.name}:{prop.name}={propvalu} (layer: {layr.iden}, node: {iden}): {oldm}', - extra={'synapse': {'node': iden, 'layer': layr.iden}}) + extra=self.core.getLogExtra(node=iden, layer=layr.iden)) continue if norm == propvalu: @@ -1001,7 +1001,7 @@ async def save(): oldm = e.errinfo.get('mesg') iden = s_common.ehex(buid) logger.warning(f'error re-norming {prop.full}={propvalu} (layer: {layr.iden}, node: {iden}): {oldm}', - extra={'synapse': {'node': iden, 'layer': layr.iden}}) + extra=self.core.getLogExtra(node=iden, layer=layr.iden)) continue nodeedits.append( diff --git a/synapse/lib/nexus.py b/synapse/lib/nexus.py index 7a3a8c3fac4..b93ed2205aa 100644 --- a/synapse/lib/nexus.py +++ b/synapse/lib/nexus.py @@ -403,6 +403,10 @@ async def eat(self, nexsiden, event, args, kwargs, meta, wait=True): # Keep a reference to the shielded task to ensure it isn't GC'd self.applytask = asyncio.create_task(self._eat((nexsiden, event, args, kwargs, meta))) + # Clone the current scope to the applytask, so that log events in the scope + # would have access to any user / sess values which have been set. + s_scope.clone(self.applytask) + except: self.cell.nexslock.release() raise diff --git a/synapse/lib/processpool.py b/synapse/lib/processpool.py index 281e29bbf54..677b8a67c00 100644 --- a/synapse/lib/processpool.py +++ b/synapse/lib/processpool.py @@ -14,7 +14,6 @@ logger = logging.getLogger(__name__) import synapse.exc as s_exc -import synapse.common as s_common import synapse.lib.logging as s_logging import synapse.lib.process as s_process diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index 368fcb46d37..6010c69392d 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -1259,16 +1259,16 @@ async def __anit__(self, core, iden, ddef): self.onfini(self.stop) async def stop(self): - logger.debug(f'Stopping Dmon {self.iden}', extra={'synapse': {'iden': self.iden}}) + logger.debug(f'Stopping Dmon {self.iden}', extra=self.core.getLogExtra(iden=self.iden)) if self.task is not None: self.task.cancel() self.task = None - logger.debug(f'Stopped Dmon {self.iden}', extra={'synapse': {'iden': self.iden}}) + logger.debug(f'Stopped Dmon {self.iden}', extra=self.core.getLogExtra(iden=self.iden)) async def run(self): if self.task: # pragma: no cover raise s_exc.SynErr(mesg=f'Dmon - {self.iden} - has a current task and cannot start a new one.', - iden=self.iden) + extra=self.core.getLogExtra(iden=self.iden)) self.task = self.schedCoro(self.dmonloop()) async def bump(self): @@ -1290,7 +1290,7 @@ def _getRunLog(self): async def dmonloop(self): - logger.debug(f'Starting Dmon {self.iden}', extra={'synapse': {'iden': self.iden}}) + logger.debug(f'Starting Dmon {self.iden}', extra=self.core.getLogExtra(iden=self.iden)) s_scope.set('user', self.user) s_scope.set('storm:dmon', self.iden) @@ -1311,26 +1311,26 @@ async def dmonloop(self): def dmonPrint(evnt): self._runLogAdd(evnt) mesg = evnt[1].get('mesg', '') - logger.info(f'Dmon - {self.iden} - {mesg}', extra={'synapse': {'iden': self.iden}}) + logger.info(f'Dmon - {self.iden} - {mesg}', extra=self.core.getLogExtra(iden=self.iden)) def dmonWarn(evnt): self._runLogAdd(evnt) mesg = evnt[1].get('mesg', '') - logger.warning(f'Dmon - {self.iden} - {mesg}', extra={'synapse': {'iden': self.iden}}) + logger.warning(f'Dmon - {self.iden} - {mesg}', extra=self.core.getLogExtra(iden=self.iden)) while not self.isfini: if self.user.info.get('locked'): self.status = 'fatal error: user locked' logger.warning(f'Dmon user is locked. Stopping Dmon {self.iden}.', - extra={'synapse': {'iden': self.iden}}) + extra=self.core.getLogExtra(iden=self.iden)) return view = self.core.getView(viewiden, user=self.user) if view is None: self.status = 'fatal error: invalid view' logger.warning(f'Dmon View is invalid. Stopping Dmon {self.iden}.', - extra={'synapse': {'iden': self.iden}}) + extra=self.core.getLogExtra(iden=self.iden)) return try: @@ -1348,7 +1348,7 @@ def dmonWarn(evnt): self.count += 1 await asyncio.sleep(0) - logger.warning(f'Dmon query exited: {self.iden}', extra={'synapse': {'iden': self.iden}}) + logger.warning(f'Dmon query exited: {self.iden}', extra=self.core.getLogExtra(iden=self.iden)) self.status = 'sleeping' @@ -1361,7 +1361,7 @@ def dmonWarn(evnt): except Exception as e: self._runLogAdd(('err', s_common.excinfo(e))) - logger.exception(f'Dmon error ({self.iden})', extra={'synapse': {'iden': self.iden}}) + logger.exception(f'Dmon error ({self.iden})', extra=self.core.getLogExtra(iden=self.iden)) self.status = f'error: {e}' self.err_evnt.set() diff --git a/synapse/lib/structlog.py b/synapse/lib/structlog.py index 8be80867c02..35333fde38b 100644 --- a/synapse/lib/structlog.py +++ b/synapse/lib/structlog.py @@ -7,6 +7,7 @@ class JsonFormatter(logging.Formatter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + s_common.deprdate('synapse.lib.structlog.JsonFormatter', '2026-04-01') def format(self, record: logging.LogRecord): diff --git a/synapse/lib/view.py b/synapse/lib/view.py index ced3afcff9c..b0c13aa31a1 100644 --- a/synapse/lib/view.py +++ b/synapse/lib/view.py @@ -957,38 +957,37 @@ async def eval(self, text, opts=None): async def callStorm(self, text, opts=None): user = self.core._userFromOpts(opts) - try: - - async for item in self.eval(text, opts=opts): - await asyncio.sleep(0) # pragma: no cover - - except s_stormctrl.StormReturn as e: - # Catch return( ... ) values and return the - # primitive version of that item. - return await s_stormtypes.toprim(e.item) + with s_scope.enter({'user': user}): + try: - except asyncio.CancelledError: - extra = self.core.getLogExtra(text=text) - logger.warning(f'callStorm cancelled', extra=extra) - raise + async for item in self.eval(text, opts=opts): + await asyncio.sleep(0) # pragma: no cover - except (s_stormctrl.StormLoopCtrl, s_stormctrl.StormGenrCtrl) as e: - if isinstance(e, s_stormctrl.StormLoopCtrl): - mesg = f'Loop control statement "{e.statement}" used outside of a loop.' - else: - mesg = f'Generator control statement "{e.statement}" used outside of a generator function.' - logmesg = f'Error during storm execution for {{ {text} }} - {mesg}' - extra = self.core.getLogExtra(text=text) - logger.exception(logmesg, extra=extra) - raise s_exc.StormRuntimeError(mesg=mesg, statement=e.statement, highlight=e.get('highlight')) from e + except s_stormctrl.StormReturn as e: + # Catch return( ... ) values and return the + # primitive version of that item. + return await s_stormtypes.toprim(e.item) - except Exception: - extra = self.core.getLogExtra(text=text) - logger.exception(f'Error during callStorm execution for {{ {text} }}', extra=extra) - raise + except asyncio.CancelledError: + extra = self.core.getLogExtra(text=text) + logger.warning(f'callStorm cancelled', extra=extra) + raise - # Any other exceptions will be raised to - # callers as expected. + except (s_stormctrl.StormLoopCtrl, s_stormctrl.StormGenrCtrl) as e: + if isinstance(e, s_stormctrl.StormLoopCtrl): + mesg = f'Loop control statement "{e.statement}" used outside of a loop.' + else: + mesg = f'Generator control statement "{e.statement}" used outside of a generator function.' + logmesg = f'Error during storm execution for {{ {text} }} - {mesg}' + logger.exception(logmesg, extra=self.core.getLogExtra(text=text)) + raise s_exc.StormRuntimeError(mesg=mesg, statement=e.statement, highlight=e.get('highlight')) from e + + except Exception: + logger.exception(f'Error during callStorm execution for {{ {text} }}', + extra=self.core.getLogExtra(text=text, view=self.iden)) + # Any other exceptions will be raised to + # callers as expected. + raise async def nodes(self, text, opts=None): ''' @@ -1041,6 +1040,7 @@ async def runStorm(): tick = s_common.now() abstick = s_common.mononow() count = 0 + try: # Always start with an init message. @@ -1053,30 +1053,28 @@ async def runStorm(): shownode = (not show or 'node' in show) - with s_scope.enter({'user': user}): + async with await self.snap(user=user) as snap: - async with await self.snap(user=user) as snap: + if keepalive: + snap.schedCoro(snap.keepalive(keepalive)) - if keepalive: - snap.schedCoro(snap.keepalive(keepalive)) + if not show: + snap.link(chan.put) - if not show: - snap.link(chan.put) - - else: - [snap.on(n, chan.put) for n in show] + else: + [snap.on(n, chan.put) for n in show] - if shownode: - async for pode in snap.iterStormPodes(text, opts=opts, user=user): - await chan.put(('node', pode)) - count += 1 + if shownode: + async for pode in snap.iterStormPodes(text, opts=opts, user=user): + await chan.put(('node', pode)) + count += 1 - else: - info = opts.get('_loginfo', {}) - info.update({'mode': opts.get('mode', 'storm'), 'view': self.iden}) - self.core._logStormQuery(text, user, info=info) - async for item in snap.storm(text, opts=opts, user=user): - count += 1 + else: + info = opts.get('_loginfo', {}) + info.update({'mode': opts.get('mode', 'storm'), 'view': self.iden}) + self.core._logStormQuery(text, user, info=info) + async for item in snap.storm(text, opts=opts, user=user): + count += 1 except s_stormctrl.StormExit: pass @@ -1099,8 +1097,7 @@ async def runStorm(): if mesg: logmesg = f'{logmesg} - {mesg}' - extra = self.core.getLogExtra(text=text) - logger.exception(logmesg, extra=extra) + logger.exception(logmesg, extra=self.core.getLogExtra(text=text, view=self.iden)) enfo = s_common.err(e) enfo[1].pop('esrc', None) @@ -1114,43 +1111,45 @@ async def runStorm(): tock = tick + abstook await chan.put(('fini', {'tock': tock, 'abstock': abstock, 'took': abstook, 'count': count, })) - await synt.worker(runStorm(), name='runstorm') - - editformat = opts.get('editformat', 'nodeedits') + with s_scope.enter({'user': user}): - while True: + await synt.worker(runStorm(), name='runstorm') - mesg = await chan.get() - kind = mesg[0] + editformat = opts.get('editformat', 'nodeedits') - if kind == 'node': - yield mesg - continue + while True: - if kind == 'node:edits': - if editformat == 'nodeedits': + mesg = await chan.get() + kind = mesg[0] - nodeedits = s_common.jsonsafe_nodeedits(mesg[1]['edits']) - mesg[1]['edits'] = nodeedits + if kind == 'node': yield mesg - continue - if editformat == 'none': - continue + if kind == 'node:edits': + if editformat == 'nodeedits': - assert editformat == 'count' + nodeedits = s_common.jsonsafe_nodeedits(mesg[1]['edits']) + mesg[1]['edits'] = nodeedits + yield mesg - count = sum(len(edit[2]) for edit in mesg[1].get('edits', ())) - mesg = ('node:edits:count', {'count': count}) - yield mesg - continue + continue - if kind == 'fini': - yield mesg - break + if editformat == 'none': + continue + + assert editformat == 'count' - yield mesg + count = sum(len(edit[2]) for edit in mesg[1].get('edits', ())) + mesg = ('node:edits:count', {'count': count}) + yield mesg + continue + + if kind == 'fini': + yield mesg + break + + yield mesg async def iterStormPodes(self, text, opts=None): diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index 6d0950254ea..d7d8107f3a1 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -3528,6 +3528,29 @@ async def test_storm_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('view'), view) + self.eq(mesg['params'].get('text'), 'help foo') + self.eq(mesg['username'], 'root') + + udef = await core.addUser('foouser', ) + await core.setUserAdmin(udef.get('iden'), True) + asfoo = {'user': udef.get('iden')} + + with self.getLoggerStream('synapse.storm') as stream: + await alist(core.storm('help ask', opts=asfoo)) + + mesg = stream.jsonlines()[0] + self.eq(mesg['params'].get('view'), view) + self.eq(mesg['params'].get('text'), 'help ask') + self.eq(mesg['username'], 'foouser') + + q = '[test:str=hehe] [test:int=$node.value()]' + with self.getLoggerStream('synapse.lib.view') as stream: + await alist(core.storm(q, opts=asfoo)) + msgs = stream.jsonlines() + emsg = [m for m in msgs if 'Error during storm execution' in m.get('message')][0] + self.eq(emsg['params'].get('view'), view) + self.eq(emsg['params'].get('text'), q) + self.eq(emsg['username'], 'foouser') async def test_strict(self): diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 1e7ee0c4cb1..4900fe364e8 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -2630,7 +2630,7 @@ async def wrapDelWriteHold(root, reason): log_enable_writes = f'Free space on {core.dirn} above minimum threshold' with self.getLoggerStream('synapse.lib.cell') as stream: await core.nexsroot.addWriteHold(tmp_reason := 'something else') - with self.raises(s_exc.SynErr): + with self.raises(AssertionError): await stream.expect(log_enable_writes, timeout=1) stream.seek(0) self.eq(stream.read(), '') diff --git a/synapse/tests/test_lib_httpapi.py b/synapse/tests/test_lib_httpapi.py index 41c24c40dd6..dd3fd8c30d0 100644 --- a/synapse/tests/test_lib_httpapi.py +++ b/synapse/tests/test_lib_httpapi.py @@ -1947,8 +1947,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/auth/adduser') - self.eq(mesg['params'].get('username'), 'root') - self.eq(mesg['params'].get('user'), core.auth.rootuser.iden) + self.eq(mesg['username'], 'root') + self.eq(mesg['user'], core.auth.rootuser.iden) self.isin('headers', mesg['params']) self.eq(mesg['params']['headers'].get('user-agent'), 'test_request_logging') self.isin('remoteip', mesg['params']) @@ -1965,8 +1965,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/active') self.notin('headers', mesg['params']) - self.notin('username', mesg['params']) - self.notin('user', mesg['params']) + self.notin('username', mesg) + self.notin('user', mesg) self.isin('remoteip', mesg['params']) self.isin('200 GET /api/v1/active', mesg.get('message')) @@ -1981,8 +1981,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/login') - self.eq(mesg['params'].get('username'), 'visi') - self.eq(mesg['params'].get('user'), visiiden) + self.eq(mesg['username'], 'visi') + self.eq(mesg['user'], visiiden) # session cookie loging populates the data upon reuse with self.getLoggerStream(logname) as stream: @@ -1992,8 +1992,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/auth/users') - self.eq(mesg['params'].get('username'), 'visi') - self.eq(mesg['params'].get('user'), visiiden) + self.eq(mesg['username'], 'visi') + self.eq(mesg['user'], visiiden) async with self.getTestCore(conf={'https:parse:proxy:remoteip': True}) as core: @@ -2021,8 +2021,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/auth/adduser') - self.eq(mesg['params'].get('username'), 'root') - self.eq(mesg['params'].get('user'), core.auth.rootuser.iden) + self.eq(mesg['username'], 'root') + self.eq(mesg['user'], core.auth.rootuser.iden) self.eq(mesg['params'].get('remoteip'), '1.2.3.4') self.isin('(root)', mesg.get('message')) self.isin('200 POST /api/v1/auth/adduser', mesg.get('message')) @@ -2039,8 +2039,8 @@ async def test_request_logging(self): mesg = stream.jsonlines()[0] self.eq(mesg['params'].get('uri'), '/api/v1/auth/adduser') - self.eq(mesg['params'].get('username'), 'root') - self.eq(mesg['params'].get('user'), core.auth.rootuser.iden) + self.eq(mesg['username'], 'root') + self.eq(mesg['user'], core.auth.rootuser.iden) self.eq(mesg['params'].get('remoteip'), '8.8.8.8') self.isin('(root)', mesg.get('message')) self.isin('200 POST /api/v1/auth/adduser', mesg.get('message')) diff --git a/synapse/tests/test_lib_logging.py b/synapse/tests/test_lib_logging.py index eb8d14fefa3..734997b96ea 100644 --- a/synapse/tests/test_lib_logging.py +++ b/synapse/tests/test_lib_logging.py @@ -1,9 +1,13 @@ +import gc import asyncio import logging +import unittest.mock as mock + import synapse.exc as s_exc -import synapse.lib.base as s_base +import synapse.lib.coro as s_coro +import synapse.lib.json as s_json import synapse.lib.logging as s_logging import synapse.tests.utils as s_test @@ -12,9 +16,7 @@ class LoggingTest(s_test.SynTest): - async def test_lib_logging(self): - - s_logging.setup() + async def test_lib_logging_norm(self): self.eq(10, s_logging.normLogLevel(' 10 ')) self.eq(10, s_logging.normLogLevel(10)) @@ -32,14 +34,270 @@ async def test_lib_logging(self): with self.raises(s_exc.BadArg): s_logging.normLogLevel({'key': 'newp'}) - s_logging.reset() + async def test_lib_logging_base(self): + # Ensure we're starting the test from a clean slate + self.none(s_logging.StreamHandler._pump_task) + + # Installs the logginghandlers + s_logging.setup() + self.nn(s_logging.StreamHandler._pump_task) + + # Enwusre that while we have a running logging task, that windowing of live log events to a consumer works. + + msgs = [] + # Indicates that the function has started + evnt0 = asyncio.Event() + # Indicates the function has entered the window + evnt1 = asyncio.Event() + # Indicates the function has left Window.__aiter__ - this will start the teardown the Window object, + # which will eventually cause __aiter__ to exit and leave the ioloop. + evnt2 = asyncio.Event() + async def collector(): + evnt0.set() + async for m in s_logging.watch(last=0): + evnt1.set() + msgs.append(m) + if m.get('params').get('fini'): + break + evnt2.set() + return True + + self.len(0, s_logging._log_wins) + + task = s_coro.create_task(collector()) + await asyncio.wait_for(evnt0.wait(), timeout=12) + + logger.error('window0') + + await asyncio.wait_for(evnt1.wait(), timeout=12) + self.len(1, s_logging._log_wins) + + logger.error('window1', extra=s_logging.getLogExtra(fini=True)) + + await asyncio.wait_for(evnt2.wait(), timeout=12) + self.true(await task) + + self.len(2, msgs) + self.eq([m.get('message') for m in msgs], ['window0', 'window1']) + + # Ensure that the ioloop can remove the empty Window.__aiter__ task, + # which will release the Window ref and allow it to be GC'd. + # Test runs gc.collect() as a safety for test stability. + await asyncio.sleep(0) + gc.collect() + self.len(0, s_logging._log_wins) + + s_logging.reset() self.none(s_logging.StreamHandler._pump_task) self.none(s_logging.StreamHandler._pump_event) self.len(0, s_logging.StreamHandler._logs_fifo) self.len(0, s_logging.StreamHandler._text_todo) self.len(0, s_logging.StreamHandler._logs_todo) + # Ensure various log messages are properly captured a structured data in the FIFO and logs() works + s_logging.setLogInfo('someglobal', 'testvalu') + with self.getLoggerStream('synapse.tests.test_lib_logging') as stream: + # Log a few messages + logger.error('test0', extra=s_logging.getLogExtra(foo='bar')) + s_logging.popLogInfo('someglobal') + logger.warning('test1', extra=s_logging.getLogExtra(foo='baz')) + logger.info('test2') + logger.debug('test3') + await stream.expect('test3') + + msgs = s_logging.logs() + self.isinstance(msgs, tuple) + self.len(4, msgs) + + msg0 = msgs[0] + self.eq(msg0.get('message'), 'test0') + self.eq(msg0.get('level'), 'ERROR') + self.eq(msg0.get('someglobal'), 'testvalu') + self.eq(msg0.get('params'), {'foo': 'bar'}) + self.isin('time', msg0) + lnfo = msg0.get('logger') + self.eq(lnfo.get('name'), 'synapse.tests.test_lib_logging') + self.eq(lnfo.get('func'), 'test_lib_logging_base') + self.isin('process', lnfo) + self.isin('thread', lnfo) + + msg1 = msgs[1] + self.eq(msg1.get('message'), 'test1') + self.eq(msg1.get('level'), 'WARNING') + self.notin('someglobal', msg1) + self.eq(msg1.get('params'), {'foo': 'baz'}) + self.isin('time', msg0) + + msg2 = msgs[2] + self.eq(msg2.get('message'), 'test2') + self.eq(msg2.get('level'), 'INFO') + self.eq(msg2.get('params'), {}) + + msg3 = msgs[3] + self.eq(msg3.get('message'), 'test3') + self.eq(msg3.get('level'), 'DEBUG') + self.eq(msg3.get('params'), {}) + + s_logging.reset() + + msgs = s_logging.logs() + self.len(0, msgs) + self.isinstance(msgs, tuple) + self.none(s_logging.StreamHandler._pump_task) + + async def test_lib_logging_window_drop(self): + + # Messages can be dropped from the todo queues if the pump task is not allowed + # to wake up & service the queue. + + # Ensure we're starting the test from a clean slate + self.none(s_logging.StreamHandler._pump_task) + + # Installs the logginghandlers + s_logging.setup() + self.nn(s_logging.StreamHandler._pump_task) + + # Ensure that while we have a running logging task, that windowing of live log events to a consumer works. + + msgs = [] + # Indicates that the function has started + evnt0 = asyncio.Event() + # Indicates the function has entered the window + evnt1 = asyncio.Event() + # Indicates the function has left Window.__aiter__ - this will start the teardown the Window object, + # which will eventually cause __aiter__ to exit and leave the ioloop. + evnt2 = asyncio.Event() + + async def collector(): + evnt0.set() + async for m in s_logging.watch(last=0): + evnt1.set() + msgs.append(m) + if m.get('params').get('fini'): + break + evnt2.set() + return True + + self.len(0, s_logging._log_wins) + + task = s_coro.create_task(collector()) + await asyncio.wait_for(evnt0.wait(), timeout=12) + + logger.error('window0') + + await asyncio.wait_for(evnt1.wait(), timeout=12) + self.len(1, s_logging._log_wins) + + # Now log messages which will exceed the buffer size. This will log N+1 messages, + # meaning that buftest-0 will not be present and window1 will be included. + for n in range(s_logging.LOG_QUEUE_SIZES): + logger.error(f'buftest-{n}') + logger.error('window1', extra=s_logging.getLogExtra(fini=True)) + + await asyncio.wait_for(evnt2.wait(), timeout=12) + self.true(await task) + + self.len(2 + s_logging.LOG_QUEUE_SIZES - 1, msgs) + emsgs = ['window0'] + [f'buftest-{n}' for n in range(1, s_logging.LOG_QUEUE_SIZES)] + ['window1'] + self.eq([m.get('message') for m in msgs], emsgs) + + s_logging.reset() + + async def test_lib_logging_shutdown(self): + # Test the _shutdown_task functionality used in s_logging.shutdown() to ensure it drains the logs + # and exits cleanly + + # Installs the logginghandlers + s_logging.setup(structlog=True) + self.nn(s_logging.StreamHandler._pump_task) + + # Let the task startup + await asyncio.sleep(0) + + msgs = [] + nlines = 3 + evnt = asyncio.Event() + + def writemock(text): + lines = text.split('\n') + for line in lines: + if line: + msgs.append(s_json.loads(line)) + if len(msgs) == nlines: + evnt.set() + + # shutdown with a pending log event + with mock.patch('synapse.lib.logging._writestderr', writemock) as patch: + logger.error('message0') + logger.error('message1') + await asyncio.sleep(0) + logger.error('message2') + # Shutdown before nlines have been accounted for + await s_logging._shutdown_task() + self.true(await asyncio.wait_for(evnt.wait(), timeout=12)) + + self.true(s_logging.StreamHandler._pump_task.done()) + self.eq([m.get('message') for m in msgs], ['message0', 'message1', 'message2']) + + s_logging.reset() + + evnt.clear() + msgs.clear() + + s_logging.setup(structlog=True) + await asyncio.sleep(0) + self.false(s_logging.StreamHandler._pump_task.done()) + + # shutdown without a pending log event + with mock.patch('synapse.lib.logging._writestderr', writemock) as patch: + logger.error('message0') + logger.error('message1') + logger.error('message2') + await asyncio.sleep(0) + # Shutdown after nlines have been accounted for + self.true(await asyncio.wait_for(evnt.wait(), timeout=12)) + await s_logging._shutdown_task() + + self.true(s_logging.StreamHandler._pump_task.done()) + self.eq([m.get('message') for m in msgs], ['message0', 'message1', 'message2']) + + async def test_lib_logging_pump_error(self): + # Ensure that exceptions are captured and printed to stderr directly + s_logging.setup(structlog=True) + self.nn(s_logging.StreamHandler._pump_task) + + evnt = asyncio.Event() + + data = [] + def writemock(text): + data.append(text) + evnt.set() + + with mock.patch('synapse.lib.logging._writestderr', writemock): + s_logging.StreamHandler._text_todo.append('hehe') + s_logging.StreamHandler._logs_todo.append('hehe') + + s_logging.StreamHandler._text_todo.append(1234) + s_logging.StreamHandler._logs_todo.append('newp') + + # This will cause a type error trying to join hehe and 1234 together + # when the task wakes up and and tries to process the queue + + s_logging.StreamHandler._pump_event.set() + + self.true(await asyncio.wait_for(evnt.wait(), timeout=12)) + + text = '\n'.join(data) + self.isin('Error during log handling', text) + self.isin('Traceback', text) + + s_logging.reset() + + async def test_lib_logging_exception(self): + + # Ensure that various exception information is captured + with self.getLoggerStream('synapse.tests.test_lib_logging') as stream: try: @@ -77,3 +335,47 @@ async def test_lib_logging(self): self.eq(mesg['error']['notes'], ('outer note',)) self.eq(mesg['error']['context']['code'], 'SynErr') self.eq(mesg['error']['context']['notes'], ('inner note',)) + + stream.clear() + + # This is derived from the cpython Stdlib exception group example here + # https://docs.python.org/3.11/tutorial/errors.html#tut-exception-groups + + def f(): + raise ExceptionGroup( + "group1", + [ + ValueError(1), + SystemError(2), + ExceptionGroup( + "group2", + [ + OSError(3), + RecursionError(4) + ] + ) + ] + ) + + try: + f() + except* OSError as e: + pass + except* SystemError as e: + pass + except* Exception as e: + logger.exception('Error encountered in EG') + + mesg = stream.jsonlines()[0] + self.eq(mesg['message'], 'Error encountered in EG') + self.eq(mesg['error']['code'], 'ExceptionGroup') + self.eq(mesg['error']['mesg'], 'group1 (2 sub-exceptions)') + group = mesg['error']['group'] + self.len(2, group) + self.eq(group[0]['code'], 'ValueError') + self.eq(group[0]['mesg'], '1') + + self.eq(group[1]['code'], 'ExceptionGroup') + self.eq(group[1]['mesg'], 'group2 (1 sub-exception)') + self.eq(group[1]['group'][0]['code'], 'RecursionError') + self.eq(group[1]['group'][0]['mesg'], '4') diff --git a/synapse/tests/test_lib_structlog.py b/synapse/tests/test_lib_structlog.py index bfb4b0610ca..8c05201bd35 100644 --- a/synapse/tests/test_lib_structlog.py +++ b/synapse/tests/test_lib_structlog.py @@ -94,6 +94,9 @@ def test_structlog_datefmt(self): handler = logging.StreamHandler(stream=stream) datefmt = '%m-%Y-%d' # MMYYYYYDD formatter = s_structlog.JsonFormatter(datefmt=datefmt) + # Default convertor for time is time.localtime but that is + # not stable for testing. Use time.gmtime so tests always work. + formatter.converter = time.gmtime handler.setFormatter(formatter) logger.addHandler(handler) diff --git a/synapse/tests/test_telepath.py b/synapse/tests/test_telepath.py index ce04cf18405..8afc9f4aad1 100644 --- a/synapse/tests/test_telepath.py +++ b/synapse/tests/test_telepath.py @@ -1417,7 +1417,7 @@ async def test_telepath_exception_logging(self): except asyncio.CancelledError: pass - with self.raises(s_exc.SynErr): + with self.raises(AssertionError): await stream.expect('error during task: callStorm', timeout=0.5) with self.getLoggerStream('synapse.daemon') as stream: diff --git a/synapse/tests/test_utils.py b/synapse/tests/test_utils.py index 4fe46eb2556..7d24bec5f82 100644 --- a/synapse/tests/test_utils.py +++ b/synapse/tests/test_utils.py @@ -106,7 +106,7 @@ async def test_syntest_logstream(self): logger.error('ruh roh i am a error message') await stream.expect('ruh roh i am a error message', timeout=1) - with self.raises(s_exc.SynErr): + with self.raises(AssertionError): await stream.expect('does not exist', timeout=0.01) self.notin('newp', stream.getvalue()) diff --git a/synapse/tests/utils.py b/synapse/tests/utils.py index 9e7d43d77e0..4baeb8a7a81 100644 --- a/synapse/tests/utils.py +++ b/synapse/tests/utils.py @@ -817,21 +817,33 @@ def clear(self): self._lines.clear() self.seek(0) self.truncate() + self._event.clear() def write(self, s): - retn = io.StringIO.write(self, s) + io.StringIO.write(self, s) self._lines.append(s) self._event.set() - async def expect(self, text, count=1, timeout=5, escape=True): + async def expect(self, text, count=1, timeout=6, escape=True): + ''' + Expect a string to be present in the logged text. + + Args: + text: String to search the logs for. + count: Number of occurances of the entry. + timeout: Amount of time to wait for the text to be logged. + escape: re.escape() the provided text; to to false is text is a regular expression. + Returns: + True if the text is found. + ''' try: coro = self._expect(text, count=count, escape=escape) await s_common.wait_for(coro, timeout=timeout) except TimeoutError: logger.warning(f'Pattern [{text}] not found in...') [logger.warning(f' {line}') for line in self._lines] - raise s_exc.SynErr(mesg=f'Pattern [{text}] not found!') + raise AssertionError(f'Pattern [{text}] not found!') async def _expect(self, text, count=1, escape=True): @@ -1045,7 +1057,6 @@ def __init__(self, *args, **kwargs): def tearDown(self): s_logging.reset() - return super().tearDown() def checkNode(self, node, expected): ex_ndef, ex_props = expected