diff --git a/changes/7891929d91c287fff5159bf32f4463f3.yaml b/changes/7891929d91c287fff5159bf32f4463f3.yaml new file mode 100644 index 0000000000..b9d22a126b --- /dev/null +++ b/changes/7891929d91c287fff5159bf32f4463f3.yaml @@ -0,0 +1,7 @@ +--- +desc: Fixed a startup race condition that could cause Storm package ``onload`` queries + to run before Storm dmons were loaded. +desc:literal: false +prs: [] +type: bug +... diff --git a/synapse/cortex.py b/synapse/cortex.py index 3f4471e7d4..500a612f3f 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -925,6 +925,7 @@ async def initServiceStorage(self): self.migration = False self._migration_lock = asyncio.Lock() + self._migration_evnt = asyncio.Event() self.stormmods = {} # name: mdef self.stormpkgs = {} # name: pkgdef @@ -1023,6 +1024,7 @@ async def initServiceStorage(self): self.stormdmondefs = self.cortexdata.getSubKeyVal('storm:dmons:') self.stormdmons = await s_storm.DmonManager.anit(self) self.onfini(self.stormdmons) + await self._initStormDmons() self.agenda = await s_agenda.Agenda.anit(self) self.onfini(self.agenda) @@ -1659,14 +1661,9 @@ async def initServiceRuntime(self): await self._initCoreMods() - if self.isactive: - await self._checkLayerModels() - if not self.safemode: self.addActiveCoro(self.agenda.runloop) - await self._initStormDmons() - await self._initStormSvcs() # share ourself via the cell dmon as "cortex" @@ -1678,6 +1675,8 @@ async def initServiceActive(self): await self.stormdmons.start() async def _runMigrations(): + await self.boss.promote('cortex:migration:layers', self.auth.rootuser, background=True, protected=True) + # Run migrations when this cortex becomes active. This is to prevent # migrations getting skipped in a zero-downtime upgrade path # (upgrade mirror, promote mirror). @@ -1694,6 +1693,8 @@ async def _runMigrations(): for pkgdef in list(self.stormpkgs.values()): self._runStormPkgOnload(pkgdef) + self._migration_evnt.set() + await self.initStormPool() self.runActiveTask(_runMigrations()) diff --git a/synapse/lib/boss.py b/synapse/lib/boss.py index 1d95ef1e61..293dd5083a 100644 --- a/synapse/lib/boss.py +++ b/synapse/lib/boss.py @@ -70,7 +70,7 @@ def ps(self): def get(self, iden): return self.tasks.get(iden) - async def promote(self, name, user, info=None, taskiden=None, background=False): + async def promote(self, name, user, info=None, taskiden=None, background=False, protected=False): ''' Promote the currently running task. @@ -79,6 +79,7 @@ async def promote(self, name, user, info=None, taskiden=None, background=False): user: The User who owns the task. taskiden: An optional GUID for the task. info: An optional information dictionary containing information about the task. + protected (bool): Whether the task can be killed with safeKill() Returns: s_task.Task: The Synapse Task object. @@ -86,6 +87,7 @@ async def promote(self, name, user, info=None, taskiden=None, background=False): task = asyncio.current_task() syntask = await self.promotetask(task, name, user, info=info, taskiden=taskiden) + syntask.protected = protected syntask.background = background return syntask diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 1c5d4617a9..03815ea674 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -4760,7 +4760,7 @@ async def killTask(self, iden, peers=True, timeout=None): task = self.boss.get(iden) if task is not None: - await task.kill() + await task.safeKill() return True if not peers: @@ -4790,7 +4790,7 @@ async def kill(self, user, iden): if (task.user.iden == user.iden) or isallowed: logger.info(f'Killing task: {iden}') - await task.kill() + await task.safeKill() logger.info(f'Task killed: {iden}') return True diff --git a/synapse/lib/task.py b/synapse/lib/task.py index c498d2594e..96133b86fd 100644 --- a/synapse/lib/task.py +++ b/synapse/lib/task.py @@ -22,6 +22,7 @@ async def __anit__(self, boss, task, name, user, info=None, root=None, iden=None info = {} self.boss = boss + self.protected = False self.background = False task._syn_task = self @@ -100,6 +101,11 @@ async def kill(self): # task kill and fini are the same... await self.fini() + async def safeKill(self): + if self.protected: + raise s_exc.SynErr(mesg=f'Task {self.name} is protected.') + await self.kill() + def pack(self): pask = { @@ -124,6 +130,8 @@ def packv2(self): 'tick': self.tick, 'user': self.user.iden, 'username': self.user.name, + 'protected': self.protected, + 'background': self.background, 'kids': {i: k.packv2() for i, k in self.kids.items()}, } diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index 6992e7c53b..758ac2cabd 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -7921,44 +7921,6 @@ async def test_cortex_depr_props_warning(self): self.eq(1, data.count('deprecated properties unlocked')) self.isin(f'Detected {count - 4} deprecated properties', data) - async def test_cortex_dmons_after_modelrev(self): - with self.getTestDir() as dirn: - async with self.getTestCore(dirn=dirn) as core: - - # Add a dmon so something gets started - await core.callStorm(''' - $ddef = $lib.dmon.add(${ - $lib.print(hi) - $lib.warn(omg) - $s = `Running {$auto.type} {$auto.iden}` - $lib.log.info($s, ({"iden": $auto.iden})) - }) - ''') - - # Create this so we can find the model rev version before the - # latest - mrev = s_modelrev.ModelRev(core) - - # Add a layer and regress the version so it gets migrated on the - # next start - ldef = await core.addLayer() - layr = core.getLayer(ldef['iden']) - await layr.setModelVers(mrev.revs[-2][0]) - - with self.getLoggerStream('') as stream: - async with self.getTestCore(dirn=dirn) as core: - pass - - stream.seek(0) - data = stream.read() - - # Check that the model migration happens before the dmons start - mrevstart = data.find('beginning model migration') - dmonstart = data.find('Starting Dmon') - self.ne(-1, mrevstart) - self.ne(-1, dmonstart) - self.lt(mrevstart, dmonstart) - async def test_cortex_taxonomy_migr(self): async with self.getRegrCore('2.157.0-taxonomy-rename') as core: @@ -7988,6 +7950,68 @@ async def test_cortex_taxonomy_migr(self): self.none(core.model.formsbyiface.get('taxonomy')) self.isin('_auto:taxonomy', core.model.formsbyiface.get('meta:taxonomy')) + async def test_cortex_modelrev_task(self): + + async def dummy(self): + await asyncio.Future() + + with self.getTestDir() as dirn: + + async with self.getTestCore(dirn=dirn) as core00: + await self.waitForActiveMigration(core00) + + with patch('synapse.lib.modelrev.ModelRev.revCoreLayers', dummy): + conf01 = {'mirror': 'tcp://root:root@127.0.0.1:0'} + async with self.getTestCore(dirn=dirn, conf=conf01) as core01: + + await core01.promote(graceful=False) + await asyncio.sleep(0) + + task = await core01.callStorm(''' + for $task in $lib.task.list() { + if ($task.name = "cortex:migration:layers") { + return($task) + } + } + ''') + self.nn(task) + self.true(task['protected']) + self.true(task['background']) + + self.true(await core01.isCellActive()) + + emesg = 'Task cortex:migration:layers is protected.' + + # cannot kill through exposed Storm APIs + + opts = {'vars': {'iden': task['iden']}} + + with self.raises(s_exc.SynErr) as cm: + await core01.nodes('task.kill $iden', opts=opts) + self.eq(cm.exception.get('mesg'), emesg) + + with self.raises(s_exc.SynErr) as cm: + await core01.nodes('ps.kill $iden', opts=opts) + self.eq(cm.exception.get('mesg'), emesg) + + # cannot kill through exposed Telepath APIs + + async with core01.getLocalProxy() as proxy: + + with self.raises(s_exc.SynErr) as cm: + await proxy.killTask(task['iden']) + self.eq(cm.exception.get('mesg'), emesg) + + with self.raises(s_exc.SynErr) as cm: + await proxy.kill(task['iden']) + self.eq(cm.exception.get('mesg'), emesg) + + # internal kill still works + + self.nn(rtask := core01.boss.get(task['iden'])) + await rtask.kill() + self.none(core01.boss.get(task['iden'])) + async def test_cortex_vaults(self): ''' Simple usage testing. diff --git a/synapse/tests/test_lib_modelrev.py b/synapse/tests/test_lib_modelrev.py index 6e818ea62e..d8eabc2b97 100644 --- a/synapse/tests/test_lib_modelrev.py +++ b/synapse/tests/test_lib_modelrev.py @@ -146,8 +146,9 @@ async def test_modelrev_0_2_7_mirror(self): conf00 = {'nexslog:en': True} async with self.getTestCore(dirn=regrdir00, conf=conf00) as core00: + await self.waitForActiveMigration(core00) - self.true(await core00.getLayer().getModelVers() >= (0, 2, 7)) + self.ge(await core00.getLayer().getModelVers(), (0, 2, 7)) conf01 = {'nexslog:en': True, 'mirror': core00.getLocalUrl()} @@ -168,7 +169,7 @@ async def test_modelrev_0_2_7_mirror(self): await core01.sync() - self.true(await core01.getLayer().getModelVers() >= (0, 2, 7)) + self.ge(await core01.getLayer().getModelVers(), (0, 2, 7)) nodes = await core01.nodes('inet:fqdn=baz.com') self.len(1, nodes) diff --git a/synapse/tests/test_lib_nexus.py b/synapse/tests/test_lib_nexus.py index 1f247f1d9c..cbe391a75a 100644 --- a/synapse/tests/test_lib_nexus.py +++ b/synapse/tests/test_lib_nexus.py @@ -221,6 +221,8 @@ async def test_nexus_migration(self): with self.getRegrDir('cortexes', 'reindex-byarray3') as regrdirn: slabsize00 = s_common.getDirSize(regrdirn) async with self.getTestCore(dirn=regrdirn) as core00: + await self.waitForActiveMigration(core00) + slabsize01 = s_common.getDirSize(regrdirn) # Ensure that realsize hasn't grown wildly. That would be indicative # of a sparse file copy and not a directory move. @@ -228,7 +230,7 @@ async def test_nexus_migration(self): nexsindx = await core00.getNexsIndx() layrindx = max([await layr.getEditIndx() for layr in core00.layers.values()]) - self.gt(nexsindx, layrindx) + self.ge(nexsindx, layrindx) retn = await core00.nexsroot.nexslog.get(0) self.nn(retn) diff --git a/synapse/tests/test_lib_storm.py b/synapse/tests/test_lib_storm.py index ed067d1f99..ca8ca9e711 100644 --- a/synapse/tests/test_lib_storm.py +++ b/synapse/tests/test_lib_storm.py @@ -7,6 +7,7 @@ import synapse.exc as s_exc import synapse.common as s_common +import synapse.cortex as s_cortex import synapse.telepath as s_telepath import synapse.datamodel as s_datamodel @@ -2696,6 +2697,8 @@ async def test_storm_dmon_query_state(self): ddef = await core02.callStorm(q) self.nn(ddef['iden']) + # getStormDmons is a from_leader API so make sure it has applied to change + await core02.sync() dmons = await core02.getStormDmons() self.len(1, dmons) self.eq(dmons[0]['iden'], ddef['iden']) @@ -2842,6 +2845,43 @@ async def test_storm_undef(self): with self.raises(s_exc.NoSuchVar): await core.callStorm('$foo = 10 $foo = $lib.undef return($foo)') + async def test_storm_pkg_onload_bootup(self): + # verify that when the pkg onload handler is called it has access to the expected data + orig = s_cortex.Cortex._runStormPkgOnload + syntest = self + + pkg = { + 'name': 'testload', + 'version': '0.3.0', + 'onload': '$lib.print(hello)', + } + + def _runStormPkgOnload(self, pkgdef): + syntest.len(1, self.stormdmons.getDmonDefs()) + return orig(self, pkgdef) + + with self.getTestDir() as dirn: + + with mock.patch('synapse.cortex.Cortex._runStormPkgOnload', new=_runStormPkgOnload): + + with self.getAsyncLoggerStream('synapse.cortex', 'testload finished onload') as stream: + async with self.getTestCore(dirn=dirn) as core: + + self.len(0, core.stormdmons.getDmonDefs()) + await core.nodes('$lib.dmon.add(${})') + self.len(1, core.stormdmons.getDmonDefs()) + + await core.addStormPkg(pkg) + + self.true(await stream.wait(timeout=10)) + + with self.getAsyncLoggerStream('synapse.cortex', 'testload finished onload') as stream: + async with self.getTestCore(dirn=dirn) as core: + + self.len(1, core.stormdmons.getDmonDefs()) + + self.true(await stream.wait(timeout=10)) + async def test_storm_pkg_onload_active(self): pkg = { 'name': 'testload', diff --git a/synapse/tests/test_lib_task.py b/synapse/tests/test_lib_task.py index 8f62f56d16..f0dba75ff1 100644 --- a/synapse/tests/test_lib_task.py +++ b/synapse/tests/test_lib_task.py @@ -41,7 +41,8 @@ async def test_task_module(self): self.nn(ret.pop('iden')) self.nn(ret.pop('tick')) self.eq(ret, {'name': 'test', 'info': {'hehe': 'haha'}, - 'user': root.iden, 'username': 'root', 'kids': {}}) + 'user': root.iden, 'username': 'root', 'kids': {}, + 'protected': False, 'background': False}) async def test_taskvars(self): s_task.varset('test', 'foo') diff --git a/synapse/tests/utils.py b/synapse/tests/utils.py index dce35e2fc3..f118c0ea1e 100644 --- a/synapse/tests/utils.py +++ b/synapse/tests/utils.py @@ -1116,6 +1116,12 @@ def getRegrDir(self, *path): with self.getTestDir(copyfrom=dirn) as regrdir: yield regrdir + async def waitForActiveMigration(self, core): + ''' + Wait for any tasks that may occur after anit() returns the object. + ''' + self.true(await s_coro.event_wait(core._migration_evnt)) + @contextlib.asynccontextmanager async def getRegrCore(self, vers, conf=None, maxvers=None): with self.withNexusReplay(): @@ -1130,9 +1136,13 @@ def __init__(self, core): with mock.patch.object(s_modelrev, 'ModelRev', ModelRev): async with await s_cortex.Cortex.anit(dirn, conf=conf) as core: + if not core.conf.get('mirror'): + await self.waitForActiveMigration(core) yield core else: async with await s_cortex.Cortex.anit(dirn, conf=conf) as core: + if not core.conf.get('mirror'): + await self.waitForActiveMigration(core) yield core @contextlib.asynccontextmanager