Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5778d9a
wip
invisig0th Jan 20, 2026
9bd5143
wip
invisig0th Jan 20, 2026
f5fcc80
wip
invisig0th Jan 20, 2026
5a8c1bd
wip
invisig0th Jan 20, 2026
de1536a
wip
invisig0th Jan 20, 2026
9e40028
wip
invisig0th Jan 20, 2026
9415449
wip
invisig0th Jan 20, 2026
c8fe9ca
wip
invisig0th Jan 20, 2026
452a96a
wip
invisig0th Jan 20, 2026
1c80349
wip
invisig0th Jan 20, 2026
5b24db6
wip
invisig0th Jan 20, 2026
6aac370
wip
invisig0th Jan 21, 2026
e6b6dee
wip
invisig0th Jan 21, 2026
9824681
wip
invisig0th Jan 21, 2026
0cf1344
wip
invisig0th Jan 21, 2026
9af75dc
wip
invisig0th Jan 21, 2026
f1ce699
wip
invisig0th Jan 21, 2026
90e335e
Merge branch 'visi-logging' of ssh://github.com/vertexproject/synapse…
invisig0th Jan 21, 2026
31cd428
wip
invisig0th Jan 21, 2026
ec0570c
wip
invisig0th Jan 21, 2026
09cb3b3
Merge branch 'visi-logging' of ssh://github.com/vertexproject/synapse…
invisig0th Jan 21, 2026
cae0232
wip
invisig0th Jan 21, 2026
faedce2
wip
invisig0th Jan 21, 2026
3f86328
wip
invisig0th Jan 21, 2026
6ffd293
wip
invisig0th Jan 22, 2026
c316b66
wip
invisig0th Jan 22, 2026
bdd81dc
Merge branch 'master' into visi-logging
invisig0th Jan 22, 2026
1394b73
wip
invisig0th Jan 22, 2026
b09e1a8
wip
invisig0th Jan 22, 2026
713ada2
merge from master
invisig0th Jan 22, 2026
2e4ae95
wip
invisig0th Jan 22, 2026
7a8a704
wip
invisig0th Jan 22, 2026
a5172be
Merge branch 'master' into visi-logging
invisig0th Jan 22, 2026
4115091
wip
invisig0th Jan 22, 2026
f757d12
wip
invisig0th Jan 22, 2026
14ebe4e
tweaks from feedback
invisig0th Jan 23, 2026
c3a03e1
Merge branch 'master' into visi-logging
invisig0th Jan 23, 2026
9e3c5f5
Merge branch 'master' into visi-logging
invisig0th Jan 29, 2026
d7e67f4
Merge branch 'master' into visi-logging
vEpiphyte Feb 2, 2026
ea5affa
Merge branch 'master' into visi-logging
vEpiphyte Feb 5, 2026
19e3a5c
Merge branch 'master' into visi-logging
vEpiphyte Feb 5, 2026
2f8844f
Merge branch 'master' into visi-logging
vEpiphyte Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions synapse/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ async def get(self, sha256, offs=None, size=None):
fsize = await self._reqHas(sha256)

fhash = s_common.ehex(sha256)
logger.debug(f'Getting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash))
logger.debug(f'Getting blob [{fhash}].', extra=self.getLogExtra(sha256=fhash))

if offs is not None or size is not None:

Expand Down Expand Up @@ -1203,7 +1203,7 @@ async def hashset(self, sha256):
await self._reqHas(sha256)

fhash = s_common.ehex(sha256)
logger.debug(f'Getting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash))
logger.debug(f'Getting blob [{fhash}].', extra=self.getLogExtra(sha256=fhash))

hashset = s_hashset.HashSet()

Expand Down Expand Up @@ -1251,7 +1251,7 @@ async def _populate(self, sha256, genr, size):
return int.from_bytes(byts, 'big')

fhash = s_common.ehex(sha256)
logger.debug(f'Saving blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash))
logger.debug(f'Saving blob [{fhash}].', extra=self.getLogExtra(sha256=fhash))

size = await self._saveFileGenr(sha256, genr, size)

Expand Down Expand Up @@ -1378,7 +1378,7 @@ async def _axonFileDel(self, sha256):
return False

fhash = s_common.ehex(sha256)
logger.debug(f'Deleting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash))
logger.debug(f'Deleting blob [{fhash}].', extra=self.getLogExtra(sha256=fhash))

size = int.from_bytes(byts, 'big')
self.axonmetrics.inc('file:count', valu=-1)
Expand Down Expand Up @@ -1447,7 +1447,7 @@ async def readlines(self, sha256, errors='ignore'):
todo = s_common.todo(_spawn_readlines, sock00, errors=errors)
async with await s_base.Base.anit() as scope:

scope.schedCoro(s_process.spawn(todo, log_conf=await self._getSpawnLogConf()))
scope.schedCoro(s_process.spawn(todo, logconf=self.getLogConf()))
feedtask = scope.schedCoro(self._sha256ToLink(sha256, link00))

while not self.isfini:
Expand Down Expand Up @@ -1481,7 +1481,7 @@ async def csvrows(self, sha256, dialect='excel', errors='ignore', **fmtparams):
todo = s_common.todo(_spawn_readrows, sock00, dialect, fmtparams, errors=errors)
async with await s_base.Base.anit() as scope:

scope.schedCoro(s_process.spawn(todo, log_conf=await self._getSpawnLogConf()))
scope.schedCoro(s_process.spawn(todo, logconf=self.getLogConf()))
feedtask = scope.schedCoro(self._sha256ToLink(sha256, link00))

while not self.isfini:
Expand Down Expand Up @@ -1817,7 +1817,7 @@ async def wget(self, url, params=None, headers=None, json=None, body=None, metho
Returns:
dict: An information dictionary containing the results of the request.
'''
logger.debug(f'Wget called for [{url}].', extra=await self.getLogExtra(url=s_urlhelp.sanitizeUrl(url)))
logger.debug(f'Wget called for [{url}].', extra=self.getLogExtra(url=s_urlhelp.sanitizeUrl(url)))

ssl = self.getCachedSslCtx(opts=ssl_opts, verify=ssl)

Expand Down
109 changes: 4 additions & 105 deletions synapse/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import synapse.exc as s_exc
import synapse.lib.const as s_const
import synapse.lib.logging as s_logging
import synapse.lib.msgpack as s_msgpack
import synapse.lib.structlog as s_structlog

Expand Down Expand Up @@ -763,108 +764,6 @@ def makedirs(path, mode=0o777):
def iterzip(*args, fillvalue=None):
return itertools.zip_longest(*args, fillvalue=fillvalue)

def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None):
if structlog:
structlog = 'true'
else:
structlog = 'false'
defval = os.getenv('SYN_LOG_LEVEL', defval)
datefmt = os.getenv('SYN_LOG_DATEFORMAT', datefmt)
structlog = envbool('SYN_LOG_STRUCT', structlog)
ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt}
return ret

def normLogLevel(valu):
'''
Norm a log level value to a integer.

Args:
valu: The value to norm ( a string or integer ).

Returns:
int: A valid Logging log level.
'''
if isinstance(valu, int):
if valu not in s_const.LOG_LEVEL_INVERSE_CHOICES:
raise s_exc.BadArg(mesg=f'Invalid log level provided: {valu}', valu=valu)
return valu
if isinstance(valu, str):
valu = valu.strip()
try:
valu = int(valu)
except ValueError:
valu = valu.upper()
ret = s_const.LOG_LEVEL_CHOICES.get(valu)
if ret is None:
raise s_exc.BadArg(mesg=f'Invalid log level provided: {valu}', valu=valu) from None
return ret
else:
return normLogLevel(valu)
raise s_exc.BadArg(mesg=f'Unknown log level type: {type(valu)} {valu}', valu=valu)

def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None):
'''
Configure synapse logging.

Args:
mlogger (logging.Logger): Reference to a logging.Logger()
defval (str): Default log level. May be an integer.
structlog (bool): Enabled structured (jsonl) logging output.
datefmt (str): Optional strftime format string.

Notes:
This calls logging.basicConfig and should only be called once per process.

Returns:
None
'''
ret = _getLogConfFromEnv(defval, structlog, datefmt)

datefmt = ret.get('datefmt')
log_level = ret.get('defval')
log_struct = ret.get('structlog')

if log_level: # pragma: no cover

log_level = normLogLevel(log_level)

# See https://docs.python.org/3/howto/logging-cookbook.html#blocking-handlers for info on this configuration
logq = queue.SimpleQueue()
handler = logging.handlers.QueueHandler(logq)
stream = logging.StreamHandler()

listener = logging.handlers.QueueListener(logq, stream)
listener.start()

def logfini():
# On shutdown, stop the QueueListener, remove QueueHandler from the
# logger, and add the StreamHandler so we don't lose messages that
# are logged after the QueueListener has shutdown. Messages already
# in the QueueListener will flush as part of the stop(). This is all
# required because atexit handlers are called in reverse order of
# when they were registered. So, we want to make sure to have
# functional logging until the process is completely gone.
listener.stop()
logging.root.removeHandler(handler)
logging.root.addHandler(stream)

atexit.register(logfini)

if log_struct:
formatter = s_structlog.JsonFormatter(datefmt=datefmt)
else:
formatter = logging.Formatter(fmt=s_const.LOG_FORMAT, datefmt=datefmt)

stream.setFormatter(formatter)

logging.root.setLevel(log_level)
logging.root.addHandler(handler)

if log_setup:
mlogger.info('log level set to %s', s_const.LOG_LEVEL_INVERSE_CHOICES.get(log_level))

return ret

syndir_default = '~/.syn'
syndir = os.getenv('SYN_DIR')
if syndir is None:
Expand Down Expand Up @@ -959,14 +858,14 @@ def config(conf, confdefs):

@functools.lru_cache(maxsize=1024)
def deprecated(name, curv='2.x', eolv='3.0.0'):
mesg = f'"{name}" is deprecated in {curv} and will be removed in {eolv}'
logger.warning(mesg, extra={'synapse': {'curv': curv, 'eolv': eolv}})
mesg = f'{name} is deprecated in {curv} and will be removed in {eolv}'
logger.warning(mesg, extra=s_logging.getLogExtra(curv=curv, eolv=eolv))
return mesg

@functools.lru_cache(maxsize=1024)
def deprdate(name, date): # pragma: no cover
mesg = f'{name} is deprecated and will be removed on {date}.'
logger.warning(mesg, extra={'synapse': {'eold': date}})
logger.warning(mesg, extra=s_logging.getLogExtra(eold=date))
return mesg

def jsonsafe_nodeedits(nodeedits):
Expand Down
48 changes: 26 additions & 22 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.grammar as s_grammar
import synapse.lib.httpapi as s_httpapi
import synapse.lib.logging as s_logging
import synapse.lib.msgpack as s_msgpack
import synapse.lib.modules as s_modules
import synapse.lib.schemas as s_schemas
Expand Down Expand Up @@ -965,7 +966,7 @@ async def initServiceStorage(self):
self._initCorePerms()

# Reset the storm:log:level from the config value to an int for internal use.
self.conf['storm:log:level'] = s_common.normLogLevel(self.conf.get('storm:log:level'))
self.conf['storm:log:level'] = s_logging.normLogLevel(self.conf.get('storm:log:level'))
self.stormlog = self.conf.get('storm:log')
self.stormloglvl = self.conf.get('storm:log:level')

Expand Down Expand Up @@ -1057,15 +1058,15 @@ async def initServiceStorage(self):

mesg = f'User {useriden} ({user.name}) has a rule on the "cortex" authgate. This authgate is not used ' \
f'for permission checks and will be removed in Synapse v3.0.0.'
logger.warning(mesg, extra=await self.getLogExtra(user=useriden, username=user.name))
logger.warning(mesg, extra=self.getLogExtra(user=useriden, username=user.name))
for roleiden in ag.gateroles.keys():
role = self.auth.role(roleiden)
if role is None:
continue

mesg = f'Role {roleiden} ({role.name}) has a rule on the "cortex" authgate. This authgate is not used ' \
f'for permission checks and will be removed in Synapse v3.0.0.'
logger.warning(mesg, extra=await self.getLogExtra(role=roleiden, rolename=role.name))
logger.warning(mesg, extra=self.getLogExtra(role=roleiden, rolename=role.name))

self._initVaults()

Expand Down Expand Up @@ -3025,7 +3026,7 @@ async def _normStormPkg(self, pkgdef, validstorm=True):
name = cdef.get('name')
mesg = f"Storm command definition 'forms' key is deprecated and will be removed " \
f"in 3.0.0 (command {name} in package {pkgname})"
logger.warning(mesg, extra=await self.getLogExtra(name=name, pkgname=pkgname))
logger.warning(mesg, extra=self.getLogExtra(name=name, pkgname=pkgname))

for gdef in pkgdef.get('graphs', ()):
gdef['iden'] = s_common.guid((pkgname, gdef.get('name')))
Expand Down Expand Up @@ -3094,7 +3095,7 @@ async def _onload():

await self.fire('core:pkg:onload:start', pkg=name)

logextra = await self.getLogExtra(pkg=name, vers=pkgvers)
logextra = self.getLogExtra(pkg=name, vers=pkgvers)

verskey = 'storage:version'

Expand Down Expand Up @@ -3128,7 +3129,7 @@ async def _onload():
await self.setStormPkgVar(name, verskey, vers)
continue

logextra['synapse']['initvers'] = vers
logextra['params']['initvers'] = vers

logger.info(f'{name} starting init vers={vers}: {vname}', extra=logextra)

Expand Down Expand Up @@ -6158,7 +6159,7 @@ async def count(self, text, opts=None):

if proxy is not None:
proxname = proxy._ahainfo.get('name')
extra = await self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
extra = self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
logger.info(f'Offloading Storm query to mirror {proxname}.', extra=extra)

mirropts = await self._getMirrorOpts(opts)
Expand Down Expand Up @@ -6226,21 +6227,21 @@ async def _getMirrorProxy(self, opts):
return proxy

mesg = f'Pool mirror [{proxyname}] is too far out of sync. Skipping.'
logger.warning(mesg, extra=await self.getLogExtra(delta=delta, mirror=proxyname, mirror_offset=miroffs))
logger.warning(mesg, extra=self.getLogExtra(delta=delta, mirror=proxyname, mirror_offset=miroffs))

except s_exc.ShuttingDown:
mesg = f'Proxy for pool mirror [{proxyname}] is shutting down. Skipping.'
logger.warning(mesg, extra=await self.getLogExtra(mirror=proxyname))
logger.warning(mesg, extra=self.getLogExtra(mirror=proxyname))

except s_exc.IsFini:
mesg = f'Proxy for pool mirror [{proxyname}] was shutdown. Skipping.'
logger.warning(mesg, extra=await self.getLogExtra(mirror=proxyname))
logger.warning(mesg, extra=self.getLogExtra(mirror=proxyname))

except TimeoutError:
mesg = f'Timeout waiting for pool mirror [{proxyname}] Nexus offset.'
logger.warning(mesg, extra=await self.getLogExtra(mirror=proxyname))
logger.warning(mesg, extra=self.getLogExtra(mirror=proxyname))

logger.warning('Pool members exhausted. Running query locally.', extra=await self.getLogExtra())
logger.warning('Pool members exhausted. Running query locally.', extra=self.getLogExtra())
return None

async def storm(self, text, opts=None):
Expand All @@ -6252,7 +6253,7 @@ async def storm(self, text, opts=None):

if proxy is not None:
proxname = proxy._ahainfo.get('name')
extra = await self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
extra = self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
logger.info(f'Offloading Storm query to mirror {proxname}.', extra=extra)

mirropts = await self._getMirrorOpts(opts)
Expand Down Expand Up @@ -6286,7 +6287,7 @@ async def callStorm(self, text, opts=None):

if proxy is not None:
proxname = proxy._ahainfo.get('name')
extra = await self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
extra = self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
logger.info(f'Offloading Storm query to mirror {proxname}.', extra=extra)

mirropts = await self._getMirrorOpts(opts)
Expand Down Expand Up @@ -6316,7 +6317,7 @@ async def exportStorm(self, text, opts=None):

if proxy is not None:
proxname = proxy._ahainfo.get('name')
extra = await self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
extra = self.getLogExtra(mirror=proxname, hash=s_storm.queryhash(text))
logger.info(f'Offloading Storm query to mirror {proxname}.', extra=extra)

mirropts = await self._getMirrorOpts(opts)
Expand Down Expand Up @@ -6443,8 +6444,9 @@ async def stormlist(self, text, opts=None):
async def _getStormEval(self, text):
try:
astvalu = copy.deepcopy(await s_parser.evalcache.aget(text))
except s_exc.FatalErr:
logger.exception(f'Fatal error while parsing [{text}]', extra={'synapse': {'text': text}})
except s_exc.FatalErr: # pragma: no cover
extra = self.getLogExtra(text=text)
logger.exception(f'Fatal error while parsing [{text}]', extra=extra)
await self.fini()
raise
astvalu.init(self)
Expand All @@ -6453,8 +6455,9 @@ async def _getStormEval(self, text):
async def _getStormQuery(self, args):
try:
query = copy.deepcopy(await s_parser.querycache.aget(args))
except s_exc.FatalErr:
logger.exception(f'Fatal error while parsing [{args}]', extra={'synapse': {'text': args[0]}})
except s_exc.FatalErr: # pragma: no cover
extra = self.getLogExtra(text=args[0])
logger.exception(f'Fatal error while parsing [{args}]', extra=extra)
await self.fini()
raise
query.init(self)
Expand Down Expand Up @@ -6507,8 +6510,9 @@ def _logStormQuery(self, text, user, info=None):
info['username'] = user.name
info['user'] = user.iden
info['hash'] = s_storm.queryhash(text)
extra = s_logging.getLogExtra(**info)
stormlogger.log(self.stormloglvl, 'Executing storm query {%s} as [%s]', text, user.name,
extra={'synapse': info})
extra=extra)

async def getNodeByNdef(self, ndef, view=None):
'''
Expand Down Expand Up @@ -6989,7 +6993,7 @@ async def enableCronJob(self, iden):
'''
await self.agenda.enable(iden)
await self.feedBeholder('cron:enable', {'iden': iden}, gates=[iden])
logger.info(f'Enabled cron job {iden}', extra=await self.getLogExtra(iden=iden, status='MODIFY'))
logger.info(f'Enabled cron job {iden}', extra=self.getLogExtra(iden=iden, status='MODIFY'))

@s_nexus.Pusher.onPushAuto('cron:disable')
async def disableCronJob(self, iden):
Expand All @@ -7002,7 +7006,7 @@ async def disableCronJob(self, iden):
await self.agenda.disable(iden)
await self._killCronTask(iden)
await self.feedBeholder('cron:disable', {'iden': iden}, gates=[iden])
logger.info(f'Disabled cron job {iden}', extra=await self.getLogExtra(iden=iden, status='MODIFY'))
logger.info(f'Disabled cron job {iden}', extra=self.getLogExtra(iden=iden, status='MODIFY'))

async def killCronTask(self, iden):
if self.agenda.appts.get(iden) is None:
Expand Down
Loading