Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions synapse/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,15 +781,24 @@ def makedirs(path, mode=0o777):
def iterzip(*args, fillvalue=None):
return itertools.zip_longest(*args, fillvalue=fillvalue)

def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None):
def _getLogConfFromEnv(defval=None, structlog=None, datefmt=None, qsize=None):
if structlog:
structlog = 'true'
else:
structlog = 'false'

if qsize is None:
qsize = s_structlog.DEFAULT_QSIZE

defval = os.getenv('SYN_LOG_LEVEL', defval)
datefmt = os.getenv('SYN_LOG_DATEFORMAT', datefmt)
structlog = envbool('SYN_LOG_STRUCT', structlog)
ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt}

qsize = int(os.getenv('SYN_LOG_QSIZE', qsize))
if qsize < 0:
raise s_exc.BadArg(mesg=f'SYN_LOG_QSIZE must be >=0, got {qsize}')

ret = {'defval': defval, 'structlog': structlog, 'datefmt': datefmt, 'qsize': qsize}
return ret

def normLogLevel(valu):
Expand Down Expand Up @@ -820,7 +829,7 @@ def normLogLevel(valu):
return normLogLevel(valu)
raise s_exc.BadArg(mesg=f'Unknown log level type: {type(valu)} {valu}', valu=valu)

def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None):
def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=None, qsize=None):
'''
Configure synapse logging.

Expand All @@ -829,15 +838,16 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non
defval (str): Default log level. May be an integer.
structlog (bool): Enabled structured (jsonl) logging output.
datefmt (str): Optional strftime format string.

qsize (int): Optional queue size for the root logger.
Notes:
This calls logging.basicConfig and should only be called once per process.

Returns:
None
'''
ret = _getLogConfFromEnv(defval, structlog, datefmt)
ret = _getLogConfFromEnv(defval, structlog, datefmt, qsize)

qsize = ret.get('qsize')
datefmt = ret.get('datefmt')
log_level = ret.get('defval')
log_struct = ret.get('structlog')
Expand All @@ -846,13 +856,15 @@ def setlogging(mlogger, defval=None, structlog=None, log_setup=True, datefmt=Non

log_level = normLogLevel(log_level)

handler = s_structlog.StreamHandlerWithQueue(qsize=qsize)

if log_struct:
handler = logging.StreamHandler()
formatter = s_structlog.JsonFormatter(datefmt=datefmt)
handler.setFormatter(formatter)
logging.basicConfig(level=log_level, handlers=(handler,))
else:
logging.basicConfig(level=log_level, format=s_const.LOG_FORMAT, datefmt=datefmt)
logging.basicConfig(level=log_level, handlers=(handler,), format=s_const.LOG_FORMAT, datefmt=datefmt)

if log_setup:
mlogger.info('log level set to %s', s_const.LOG_LEVEL_INVERSE_CHOICES.get(log_level))

Expand Down
20 changes: 20 additions & 0 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tempfile
import functools
import contextlib
import collections
import multiprocessing

import aiohttp
Expand Down Expand Up @@ -905,6 +906,11 @@ async def getReloadableSystems(self):
async def reload(self, subsystem=None):
return await self.cell.reload(subsystem=subsystem)

@adminapi(log=True)
async def iterCellLogs(self):
async for mesg in self.cell.iterCellLogs():
yield mesg

class Cell(s_nexus.Pusher, s_telepath.Aware):
'''
A Cell() implements a synapse micro-service.
Expand Down Expand Up @@ -1179,6 +1185,12 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
self.nexslock = asyncio.Lock()
self.netready = asyncio.Event()

self._syn_log_queue = None # type: collections.deque
for handler in logging.getLogger('').handlers:
if hasattr(handler, '_syn_log_queue'):
self._syn_log_queue = handler._syn_log_queue
break

self.conf = self._initCellConf(conf)
self.features = {
'tellready': 1,
Expand Down Expand Up @@ -1531,6 +1543,8 @@ async def fini(self):
retn = await s_nexus.Pusher.fini(self)
if retn == 0:
self._onFiniCellGuid()
if self._syn_log_queue is not None:
self._syn_log_queue = None
return retn

def _onFiniCellGuid(self):
Expand Down Expand Up @@ -5084,6 +5098,12 @@ def getCachedSslCtx(self, opts=None, verify=None):
key = tuple(sorted(opts.items()))
return self._sslctx_cache.get(key)

async def iterCellLogs(self):
if self._syn_log_queue is None:
raise s_exc.SynErr(mesg='no queue available')
for mesg in list(self._syn_log_queue):
yield mesg
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

including leading timestamp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Window this output so we can do live output


async def freeze(self, timeout=30):

if self.paused:
Expand Down
47 changes: 47 additions & 0 deletions synapse/lib/stormlib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,27 @@ class CellLib(s_stormtypes.Lib):
'(defaults to the Cortex if not provided).'},
),
'returns': {'type': 'dict', 'desc': 'A dictionary containing uptime data.', }}},
{'name': 'iterLogs', 'desc': 'Iterate over recent service logs.',
'type': {'type': 'function', '_funcname': '_methIterLogs',
'args': (
{'name': 'name', 'type': 'str', 'default': None,
# FIXME This may need to accomdate other aha / telepath values
'desc': 'The name, or iden, of the service to get uptime data for '
'(defaults to the Cortex if not provided).'},
),
# FIXME DISCUSS
# The real q is do we want the Telepath/Storm API endpoints to yield strings
# OR do we want to yield struct log messages as dictionaries ?
# Strings are EASY to grab and capture; grabbing the structlog data is
# more complicated.
'returns': {'name': 'Yields', 'type': 'str'},
}}
)
_storm_lib_path = ('cell',)
# FIXME populate
# _storm_lib_perms = (
#
# )

def __init__(self, runt, name=()):
s_stormtypes.Lib.__init__(self, runt, name=name)
Expand All @@ -192,6 +211,7 @@ def getObjLocals(self):
'hotFixesCheck': self._hotFixesCheck,
'trimNexsLog': self._trimNexsLog,
'uptime': self._uptime,
'iterLogs': self._methIterLogs,
}

@s_stormtypes.stormfunc(readonly=True)
Expand Down Expand Up @@ -339,3 +359,30 @@ async def _uptime(self, name=None):
'starttime': info['cellstarttime'],
'uptime': info['celluptime'],
}

@s_stormtypes.stormfunc(readonly=True)
async def _methIterLogs(self, name=None):
name = await s_stormtypes.tostr(name, noneok=True)

# FIXME PERMS DISCUSSION
self.runt.confirm(('storm', 'lib', 'cell', 'iterlogs'))
# if not self.runt.isAdmin():
# mesg = '$lib.cell.iterLogs() requires admin privs.'
# raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name)

if name is None:
genr = self.runt.snap.core.iterCellLogs()
else:
ssvc = self.runt.snap.core.getStormSvc(name)
if ssvc is None:
# FIXME: FALLBACK to lookup via aha or telepath?
# It sure would be nice to have an easy way to reach the logs from ANYTHING
# but that may be a later scatter/gather support item.
mesg = f'No service with name/iden: {name}'
raise s_exc.NoSuchName(mesg=mesg)
await ssvc.proxy.waitready()
genr = ssvc.proxy.iterCellLogs()

async for mesg in genr:
yield mesg
await asyncio.sleep(0)
13 changes: 13 additions & 0 deletions synapse/lib/structlog.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import json

import logging
import collections

import synapse.common as s_common

DEFAULT_QSIZE = 1024

class StreamHandlerWithQueue(logging.StreamHandler):
def __init__(self, stream=None, qsize=DEFAULT_QSIZE):
super().__init__(stream=stream)
self._syn_log_queue = collections.deque(maxlen=qsize)

def format(self, record: logging.LogRecord) -> str:
mesg = self.format(record)
self._syn_log_queue.append(mesg)
return mesg

class JsonFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
23 changes: 22 additions & 1 deletion synapse/tests/test_lib_stormlib_cell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import synapse.exc as s_exc

import synapse.lib.aha as s_aha
import synapse.lib.coro as s_coro
import synapse.lib.const as s_const
import synapse.lib.stormlib.cell as s_stormlib_cell
Expand Down Expand Up @@ -351,3 +350,25 @@ async def test_stormfix_riskhasvuln(self):
self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view0', opts={'view': view0}))
self.len(1, await core.nodes('risk:vulnerable -> it:prod:softver +:name=view1', opts={'view': view1}))
self.len(1, await core.nodes('risk:vulnerable -> it:host', opts={'view': view2}))

async def test_stormlib_cell_iterlogs(self):
with self.getAsyncLoggerStream('', '') as stream:
conf = {'storm:log': True}
async with self.getTestCore(conf=conf) as core:
msgs = await core.stormlist('$lib.print(hello)')
msgs = await core.stormlist('$lib.log.info(hello)')
msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }')
self.stormIsInPrint('Executing storm query {$lib.print(hello)} ', msgs)
self.stormIsInPrint('Executing storm query {$lib.log.info(hello)}', msgs)
self.stormIsInPrint('hello', msgs)

with self.getStructuredAsyncLoggerStream('', '') as stream:
conf = {'storm:log': True}
async with self.getTestCore(conf=conf) as core:
msgs = await core.stormlist('$lib.print(hello)')
msgs = await core.stormlist('$lib.log.info(hello)')
msgs = await core.stormlist('for $mesg in $lib.cell.iterLogs() { $lib.print($mesg) }')

self.stormIsInPrint('"message": "Executing storm query {$lib.print(hello)}', msgs)
self.stormIsInPrint('"message": "Executing storm query {$lib.log.info(hello)}', msgs)
self.stormIsInPrint('"message": "hello"', msgs)
4 changes: 2 additions & 2 deletions synapse/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,7 @@ def getAsyncLoggerStream(self, logname, mesg='') -> contextlib.AbstractContextMa
'''
stream = AsyncStreamEvent()
stream.setMesg(mesg)
handler = logging.StreamHandler(stream)
handler = s_structlog.StreamHandlerWithQueue(stream=stream)
slogger = logging.getLogger(logname)
slogger.addHandler(handler)
level = slogger.level
Expand Down Expand Up @@ -1829,7 +1829,7 @@ def getStructuredAsyncLoggerStream(self, logname, mesg='') -> contextlib.Abstrac
'''
stream = AsyncStreamEvent()
stream.setMesg(mesg)
handler = logging.StreamHandler(stream)
handler = s_structlog.StreamHandlerWithQueue(stream=stream)
slogger = logging.getLogger(logname)
formatter = s_structlog.JsonFormatter()
handler.setFormatter(formatter)
Expand Down