Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changes/7891929d91c287fff5159bf32f4463f3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
desc: Fixed a startup race condition that could cause Storm package ``onload`` queries
to run before Storm dmons were loaded.
desc:literal: false
prs: []
type: bug
...
11 changes: 6 additions & 5 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ async def initServiceStorage(self):

self.migration = False
self._migration_lock = asyncio.Lock()
self._migration_evnt = asyncio.Event()

self.stormmods = {} # name: mdef
self.stormpkgs = {} # name: pkgdef
Expand Down Expand Up @@ -1023,6 +1024,7 @@ async def initServiceStorage(self):
self.stormdmondefs = self.cortexdata.getSubKeyVal('storm:dmons:')
self.stormdmons = await s_storm.DmonManager.anit(self)
self.onfini(self.stormdmons)
await self._initStormDmons()

self.agenda = await s_agenda.Agenda.anit(self)
self.onfini(self.agenda)
Expand Down Expand Up @@ -1659,14 +1661,9 @@ async def initServiceRuntime(self):

await self._initCoreMods()

if self.isactive:
await self._checkLayerModels()

if not self.safemode:
self.addActiveCoro(self.agenda.runloop)

await self._initStormDmons()

await self._initStormSvcs()

# share ourself via the cell dmon as "cortex"
Expand All @@ -1678,6 +1675,8 @@ async def initServiceActive(self):
await self.stormdmons.start()

async def _runMigrations():
await self.boss.promote('cortex:migration:layers', self.auth.rootuser, background=True, protected=True)

# Run migrations when this cortex becomes active. This is to prevent
# migrations getting skipped in a zero-downtime upgrade path
# (upgrade mirror, promote mirror).
Expand All @@ -1694,6 +1693,8 @@ async def _runMigrations():
for pkgdef in list(self.stormpkgs.values()):
self._runStormPkgOnload(pkgdef)

self._migration_evnt.set()

await self.initStormPool()

self.runActiveTask(_runMigrations())
Expand Down
4 changes: 3 additions & 1 deletion synapse/lib/boss.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def ps(self):
def get(self, iden):
return self.tasks.get(iden)

async def promote(self, name, user, info=None, taskiden=None, background=False):
async def promote(self, name, user, info=None, taskiden=None, background=False, protected=False):
'''
Promote the currently running task.

Expand All @@ -79,13 +79,15 @@ async def promote(self, name, user, info=None, taskiden=None, background=False):
user: The User who owns the task.
taskiden: An optional GUID for the task.
info: An optional information dictionary containing information about the task.
protected (bool): Whether the task can be killed with safeKill()

Returns:
s_task.Task: The Synapse Task object.
'''
task = asyncio.current_task()

syntask = await self.promotetask(task, name, user, info=info, taskiden=taskiden)
syntask.protected = protected
syntask.background = background

return syntask
Expand Down
4 changes: 2 additions & 2 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -4760,7 +4760,7 @@ async def killTask(self, iden, peers=True, timeout=None):

task = self.boss.get(iden)
if task is not None:
await task.kill()
await task.safeKill()
return True

if not peers:
Expand Down Expand Up @@ -4790,7 +4790,7 @@ async def kill(self, user, iden):

if (task.user.iden == user.iden) or isallowed:
logger.info(f'Killing task: {iden}')
await task.kill()
await task.safeKill()
logger.info(f'Task killed: {iden}')
return True

Expand Down
8 changes: 8 additions & 0 deletions synapse/lib/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def __anit__(self, boss, task, name, user, info=None, root=None, iden=None
info = {}

self.boss = boss
self.protected = False
self.background = False

task._syn_task = self
Expand Down Expand Up @@ -100,6 +101,11 @@ async def kill(self):
# task kill and fini are the same...
await self.fini()

async def safeKill(self):
if self.protected:
raise s_exc.SynErr(mesg=f'Task {self.name} is protected.')
await self.kill()

def pack(self):

pask = {
Expand All @@ -124,6 +130,8 @@ def packv2(self):
'tick': self.tick,
'user': self.user.iden,
'username': self.user.name,
'protected': self.protected,
'background': self.background,
'kids': {i: k.packv2() for i, k in self.kids.items()},
}

Expand Down
100 changes: 62 additions & 38 deletions synapse/tests/test_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -7921,44 +7921,6 @@ async def test_cortex_depr_props_warning(self):
self.eq(1, data.count('deprecated properties unlocked'))
self.isin(f'Detected {count - 4} deprecated properties', data)

async def test_cortex_dmons_after_modelrev(self):
with self.getTestDir() as dirn:
async with self.getTestCore(dirn=dirn) as core:

# Add a dmon so something gets started
await core.callStorm('''
$ddef = $lib.dmon.add(${
$lib.print(hi)
$lib.warn(omg)
$s = `Running {$auto.type} {$auto.iden}`
$lib.log.info($s, ({"iden": $auto.iden}))
})
''')

# Create this so we can find the model rev version before the
# latest
mrev = s_modelrev.ModelRev(core)

# Add a layer and regress the version so it gets migrated on the
# next start
ldef = await core.addLayer()
layr = core.getLayer(ldef['iden'])
await layr.setModelVers(mrev.revs[-2][0])

with self.getLoggerStream('') as stream:
async with self.getTestCore(dirn=dirn) as core:
pass

stream.seek(0)
data = stream.read()

# Check that the model migration happens before the dmons start
mrevstart = data.find('beginning model migration')
dmonstart = data.find('Starting Dmon')
self.ne(-1, mrevstart)
self.ne(-1, dmonstart)
self.lt(mrevstart, dmonstart)

async def test_cortex_taxonomy_migr(self):

async with self.getRegrCore('2.157.0-taxonomy-rename') as core:
Expand Down Expand Up @@ -7988,6 +7950,68 @@ async def test_cortex_taxonomy_migr(self):
self.none(core.model.formsbyiface.get('taxonomy'))
self.isin('_auto:taxonomy', core.model.formsbyiface.get('meta:taxonomy'))

async def test_cortex_modelrev_task(self):

async def dummy(self):
await asyncio.Future()

with self.getTestDir() as dirn:

async with self.getTestCore(dirn=dirn) as core00:
await self.waitForActiveMigration(core00)

with patch('synapse.lib.modelrev.ModelRev.revCoreLayers', dummy):
conf01 = {'mirror': 'tcp://root:root@127.0.0.1:0'}
async with self.getTestCore(dirn=dirn, conf=conf01) as core01:

await core01.promote(graceful=False)
await asyncio.sleep(0)

task = await core01.callStorm('''
for $task in $lib.task.list() {
if ($task.name = "cortex:migration:layers") {
return($task)
}
}
''')
self.nn(task)
self.true(task['protected'])
self.true(task['background'])

self.true(await core01.isCellActive())

emesg = 'Task cortex:migration:layers is protected.'

# cannot kill through exposed Storm APIs

opts = {'vars': {'iden': task['iden']}}

with self.raises(s_exc.SynErr) as cm:
await core01.nodes('task.kill $iden', opts=opts)
self.eq(cm.exception.get('mesg'), emesg)

with self.raises(s_exc.SynErr) as cm:
await core01.nodes('ps.kill $iden', opts=opts)
self.eq(cm.exception.get('mesg'), emesg)

# cannot kill through exposed Telepath APIs

async with core01.getLocalProxy() as proxy:

with self.raises(s_exc.SynErr) as cm:
await proxy.killTask(task['iden'])
self.eq(cm.exception.get('mesg'), emesg)

with self.raises(s_exc.SynErr) as cm:
await proxy.kill(task['iden'])
self.eq(cm.exception.get('mesg'), emesg)

# internal kill still works

self.nn(rtask := core01.boss.get(task['iden']))
await rtask.kill()
self.none(core01.boss.get(task['iden']))

async def test_cortex_vaults(self):
'''
Simple usage testing.
Expand Down
5 changes: 3 additions & 2 deletions synapse/tests/test_lib_modelrev.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ async def test_modelrev_0_2_7_mirror(self):
conf00 = {'nexslog:en': True}

async with self.getTestCore(dirn=regrdir00, conf=conf00) as core00:
await self.waitForActiveMigration(core00)

self.true(await core00.getLayer().getModelVers() >= (0, 2, 7))
self.ge(await core00.getLayer().getModelVers(), (0, 2, 7))

conf01 = {'nexslog:en': True, 'mirror': core00.getLocalUrl()}

Expand All @@ -168,7 +169,7 @@ async def test_modelrev_0_2_7_mirror(self):

await core01.sync()

self.true(await core01.getLayer().getModelVers() >= (0, 2, 7))
self.ge(await core01.getLayer().getModelVers(), (0, 2, 7))

nodes = await core01.nodes('inet:fqdn=baz.com')
self.len(1, nodes)
Expand Down
4 changes: 3 additions & 1 deletion synapse/tests/test_lib_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,16 @@ async def test_nexus_migration(self):
with self.getRegrDir('cortexes', 'reindex-byarray3') as regrdirn:
slabsize00 = s_common.getDirSize(regrdirn)
async with self.getTestCore(dirn=regrdirn) as core00:
await self.waitForActiveMigration(core00)

slabsize01 = s_common.getDirSize(regrdirn)
# Ensure that realsize hasn't grown wildly. That would be indicative
# of a sparse file copy and not a directory move.
self.lt(slabsize01[0], 3 * slabsize00[0])

nexsindx = await core00.getNexsIndx()
layrindx = max([await layr.getEditIndx() for layr in core00.layers.values()])
self.gt(nexsindx, layrindx)
self.ge(nexsindx, layrindx)

retn = await core00.nexsroot.nexslog.get(0)
self.nn(retn)
Expand Down
40 changes: 40 additions & 0 deletions synapse/tests/test_lib_storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.cortex as s_cortex
import synapse.telepath as s_telepath
import synapse.datamodel as s_datamodel

Expand Down Expand Up @@ -2696,6 +2697,8 @@ async def test_storm_dmon_query_state(self):
ddef = await core02.callStorm(q)
self.nn(ddef['iden'])

# getStormDmons is a from_leader API so make sure it has applied to change
await core02.sync()
dmons = await core02.getStormDmons()
self.len(1, dmons)
self.eq(dmons[0]['iden'], ddef['iden'])
Expand Down Expand Up @@ -2842,6 +2845,43 @@ async def test_storm_undef(self):
with self.raises(s_exc.NoSuchVar):
await core.callStorm('$foo = 10 $foo = $lib.undef return($foo)')

async def test_storm_pkg_onload_bootup(self):
# verify that when the pkg onload handler is called it has access to the expected data
orig = s_cortex.Cortex._runStormPkgOnload
syntest = self

pkg = {
'name': 'testload',
'version': '0.3.0',
'onload': '$lib.print(hello)',
}

def _runStormPkgOnload(self, pkgdef):
syntest.len(1, self.stormdmons.getDmonDefs())
return orig(self, pkgdef)

with self.getTestDir() as dirn:

with mock.patch('synapse.cortex.Cortex._runStormPkgOnload', new=_runStormPkgOnload):

with self.getAsyncLoggerStream('synapse.cortex', 'testload finished onload') as stream:
async with self.getTestCore(dirn=dirn) as core:

self.len(0, core.stormdmons.getDmonDefs())
await core.nodes('$lib.dmon.add(${})')
self.len(1, core.stormdmons.getDmonDefs())

await core.addStormPkg(pkg)

self.true(await stream.wait(timeout=10))

with self.getAsyncLoggerStream('synapse.cortex', 'testload finished onload') as stream:
async with self.getTestCore(dirn=dirn) as core:

self.len(1, core.stormdmons.getDmonDefs())

self.true(await stream.wait(timeout=10))

async def test_storm_pkg_onload_active(self):
pkg = {
'name': 'testload',
Expand Down
3 changes: 2 additions & 1 deletion synapse/tests/test_lib_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ async def test_task_module(self):
self.nn(ret.pop('iden'))
self.nn(ret.pop('tick'))
self.eq(ret, {'name': 'test', 'info': {'hehe': 'haha'},
'user': root.iden, 'username': 'root', 'kids': {}})
'user': root.iden, 'username': 'root', 'kids': {},
'protected': False, 'background': False})

async def test_taskvars(self):
s_task.varset('test', 'foo')
Expand Down
10 changes: 10 additions & 0 deletions synapse/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,12 @@ def getRegrDir(self, *path):
with self.getTestDir(copyfrom=dirn) as regrdir:
yield regrdir

async def waitForActiveMigration(self, core):
'''
Wait for any tasks that may occur after anit() returns the object.
'''
self.true(await s_coro.event_wait(core._migration_evnt))

@contextlib.asynccontextmanager
async def getRegrCore(self, vers, conf=None, maxvers=None):
with self.withNexusReplay():
Expand All @@ -1130,9 +1136,13 @@ def __init__(self, core):

with mock.patch.object(s_modelrev, 'ModelRev', ModelRev):
async with await s_cortex.Cortex.anit(dirn, conf=conf) as core:
if not core.conf.get('mirror'):
await self.waitForActiveMigration(core)
yield core
else:
async with await s_cortex.Cortex.anit(dirn, conf=conf) as core:
if not core.conf.get('mirror'):
await self.waitForActiveMigration(core)
yield core

@contextlib.asynccontextmanager
Expand Down