Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import asyncio
import logging
import tarfile
import weakref
import argparse
import datetime
import platform
import tempfile
import functools
import contextlib
import collections
import multiprocessing

import aiohttp
Expand Down Expand Up @@ -55,6 +57,7 @@
import synapse.lib.version as s_version
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.thisplat as s_thisplat
import synapse.lib.structlog as s_structlog

import synapse.lib.crypto.passwd as s_passwd

Expand Down Expand Up @@ -192,6 +195,29 @@
logger.info(f'Backup streaming process for [{path}] starting.')
asyncio.run(_iterBackupWork(path, linkinfo))

class LogHandler(logging.Handler):

def __init__(self, cell):
logging.Handler.__init__(self)
self.cell = cell

# TODO This needs some help untangling the multiple ways to
# specify log levels to the Cell...

logconf = cell.conf.get('_log_conf', {})
# how did "defval" become the loglevel key?
loglevel = logconf.get('defval')
if loglevel is None:
loglevel = logging.WARNING

self.setLevel(loglevel)
self.setFormatter(s_structlog.JsonFormatter())

def emit(self, record):
# async needed to append to windows, but firing a task
# seems like overkill...
self.cell.addLogRec(self.format(record))

class CellApi(s_base.Base):

async def __anit__(self, cell, link, user):
Expand Down Expand Up @@ -890,6 +916,10 @@
async def reload(self, subsystem=None):
return await self.cell.reload(subsystem=subsystem)

@adminapi(log=True)
async def getLogs(self, size=100):
return self.cell.getLogs(size=size)

Check warning on line 921 in synapse/lib/cell.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/cell.py#L921

Added line #L921 was not covered by tests

class Cell(s_nexus.Pusher, s_telepath.Aware):
'''
A Cell() implements a synapse micro-service.
Expand Down Expand Up @@ -1238,6 +1268,7 @@

await s_nexus.Pusher.__anit__(self, self.iden)

self._initLogs()
self._initCertDir()

await self.enter_context(s_telepath.loadTeleCell(self.dirn))
Expand Down Expand Up @@ -1473,6 +1504,57 @@

logger.warning(f'...Cell ({self.getCellType()}) auth migration complete!')

def _initLogs(self):
# setup cell log capture
self._log_todo = []
self._log_fifo = collections.deque(maxlen=10000)
self._log_fill = asyncio.Event()
self._log_windows = weakref.WeakSet()

handler = LogHandler(self)
logging.getLogger().addHandler(handler)
def _finiLogs():
self._log_fill.set()
logging.getLogger().removeHandler(handler)

self.onfini(_finiLogs)
self.schedCoro(self._logWindowFiller())

# TODO: add peers=True and use AHA gather
def getLogs(self, size=100):
return self._log_fifo[-size:]

Check warning on line 1525 in synapse/lib/cell.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/cell.py#L1525

Added line #L1525 was not covered by tests

# TODO: add peers=True and use AHA gather and mergesort
async def getLogTail(self):

async with await s_queue.Window.anit(maxsize=1000) as wind:
self._log_windows.add(wind)

Check warning on line 1531 in synapse/lib/cell.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/cell.py#L1530-L1531

Added lines #L1530 - L1531 were not covered by tests

for logrec in wind:
yield logrec

Check warning on line 1534 in synapse/lib/cell.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/cell.py#L1533-L1534

Added lines #L1533 - L1534 were not covered by tests

def addLogRec(self, logrec):
self._log_fifo.append(logrec)
self._log_todo.append(logrec)
self._log_fill.set()

async def _logWindowFiller(self):

while not self.isfini:

await self._log_fill.wait()
if self.isfini:
break

todo = list(self._log_todo)

self._log_todo.clear()
self._log_fill.clear()

if todo:
for window in self._log_windows:
await window.puts(todo)

Check warning on line 1556 in synapse/lib/cell.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/cell.py#L1556

Added line #L1556 was not covered by tests

def getPermDef(self, perm):
perm = tuple(perm)
if self.permlook is None:
Expand Down
Loading