diff --git a/synapse/cortex.py b/synapse/cortex.py index e9885301f03..4177e131b1c 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -4735,35 +4735,64 @@ async def delLayrPull(self, layriden, pulliden): await self.delActiveCoro(pulliden) - async def runLayrPush(self, layr, pdef): - url = pdef.get('url') - iden = pdef.get('iden') - # push() will refire as needed + async def syncLayerNodeEdits(self, layr, offs, wait=True, compat=False, withmeta=False): - async def push(): - taskname = f'layer push: {layr.iden} {iden}' - async with await self.boss.promote(taskname, self.auth.rootuser, background=True): - async with await s_telepath.openurl(url) as proxy: - celliden = await proxy.getCellIden() - compat = celliden == self.iden - await self._pushBulkEdits(layr, proxy, pdef, compat=compat) + layriden = layr.iden - self.addActiveCoro(push, iden=iden) + async def getNexusEdits(strt): + async for nexsoffs, item in self.getNexusChanges(strt, wait=False): + if item[0] != layriden or item[1] != 'edits': + continue - async def runLayrPull(self, layr, pdef): - url = pdef.get('url') - iden = pdef.get('iden') - # pull() will refire as needed + edits = item[2][0] + if compat: + edits = await self.localToRemoteEdits(edits) - async def pull(): - taskname = f'layer pull: {layr.iden} {iden}' - async with await self.boss.promote(taskname, self.auth.rootuser, background=True): - async with await s_telepath.openurl(url) as proxy: - celliden = await proxy.getCellIden() - compat = celliden == self.iden - await self._pushBulkEdits(proxy, layr, pdef, compat=compat) + if withmeta: + yield (nexsoffs, edits, item[2][1]) + else: + yield (nexsoffs, edits) - self.addActiveCoro(pull, iden=iden) + lastoffs = -1 + async for item in getNexusEdits(offs): + lastoffs = item[0] + yield item + + if not wait: + return + + async with layr.getNodeEditWindow() as wind: + + # Ensure we are caught up after grabbing a window + sync = True + maxoffs = max(offs, lastoffs + 1) + + async for item in getNexusEdits(maxoffs): + maxoffs = item[0] + yield item + + async for editoffs, edits, meta in wind: + if sync: + if editoffs <= maxoffs: + continue + sync = False + + if compat: + edits = await self.localToRemoteEdits(edits) + + if withmeta: + yield (editoffs, edits, meta) + else: + yield (editoffs, edits) + + async def getLayerNodeEditGenr(self, layr): + + async def genr(offs, compat): + async for item in self.syncLayerNodeEdits(layr, offs, compat=compat): + yield item + await asyncio.sleep(0) + + return genr async def localToRemoteEdits(self, lnodeedits): rnodeedits = [] @@ -4773,7 +4802,7 @@ async def localToRemoteEdits(self, lnodeedits): redits = [] async for edit in s_coro.pause(ledits): - if edit[0] in (10, 11): + if edit[0] in s_layer.EDIT_EDGE: verb, n2nid = edit[1] if (n2ndef := self.getNidNdef(s_common.int64en(n2nid))) is None: continue @@ -4804,7 +4833,7 @@ async def remoteToLocalEdits(self, rnodeedits): ledits = [] async for edit in s_coro.pause(redits): - if edit[0] in (10, 11): + if edit[0] in s_layer.EDIT_EDGE: verb, n2ndef = edit[1] n2nid = await self.genNdefNid(n2ndef) ledits.append((edit[0], (verb, s_common.int64un(n2nid)))) @@ -4816,7 +4845,37 @@ async def remoteToLocalEdits(self, rnodeedits): return lnodeedits - async def _pushBulkEdits(self, layr0, layr1, pdef, compat): + async def runLayrPush(self, layr, pdef): + url = pdef.get('url') + iden = pdef.get('iden') + # push() will refire as needed + + async def push(): + taskname = f'layer push: {layr.iden} {iden}' + async with await self.boss.promote(taskname, self.auth.rootuser, background=True): + async with await s_telepath.openurl(url) as proxy: + celliden = await proxy.getCellIden() + src = await self.getLayerNodeEditGenr(layr) + await self._pushBulkEdits(src, proxy, pdef, compat=(celliden != self.iden)) + + self.addActiveCoro(push, iden=iden) + + async def runLayrPull(self, layr, pdef): + url = pdef.get('url') + iden = pdef.get('iden') + # pull() will refire as needed + + async def pull(): + taskname = f'layer pull: {layr.iden} {iden}' + async with await self.boss.promote(taskname, self.auth.rootuser, background=True): + async with await s_telepath.openurl(url) as proxy: + celliden = await proxy.getCellIden() + src = proxy.syncNodeEdits + await self._pushBulkEdits(src, layr, pdef, compat=(celliden != self.iden)) + + self.addActiveCoro(pull, iden=iden) + + async def _pushBulkEdits(self, src, dst, pdef, compat): iden = pdef.get('iden') user = pdef.get('user') @@ -4836,7 +4895,7 @@ async def fill(): else: filloffs = soffs - async for item in layr0.syncNodeEdits(filloffs, compat=compat): + async for item in src(filloffs, compat=compat): await queue.put(item) await asyncio.sleep(0) @@ -4860,12 +4919,12 @@ async def fill(): # prevent push->push->push nodeedits growth alledits.extend(edits) if len(alledits) > csize: - await layr1.saveNodeEdits(alledits, meta, compat=compat) + await dst.saveNodeEdits(alledits, meta, compat=compat) await self.setLayrSyncOffs(iden, offs) alledits.clear() if alledits: - await layr1.saveNodeEdits(alledits, meta, compat=compat) + await dst.saveNodeEdits(alledits, meta, compat=compat) await self.setLayrSyncOffs(iden, offs) async def saveLayerNodeEdits(self, layriden, edits, meta, waitiden=None): diff --git a/synapse/lib/layer.py b/synapse/lib/layer.py index 6e9b4b9b8ae..dfc23e48a11 100644 --- a/synapse/lib/layer.py +++ b/synapse/lib/layer.py @@ -255,6 +255,13 @@ async def getIden(self): EDIT_PROGRESS = 100 # (used by syncNodeEdits) (, ()) +EDIT_NODE = {EDIT_NODE_ADD, EDIT_NODE_DEL, EDIT_NODE_TOMB, EDIT_NODE_TOMB_DEL} +EDIT_PROP = {EDIT_PROP_SET, EDIT_PROP_DEL, EDIT_PROP_TOMB, EDIT_PROP_TOMB_DEL} +EDIT_TAG = {EDIT_TAG_SET, EDIT_TAG_DEL, EDIT_TAG_TOMB, EDIT_TAG_TOMB_DEL} +EDIT_TAGPROP = {EDIT_TAGPROP_SET, EDIT_TAGPROP_DEL, EDIT_TAGPROP_TOMB, EDIT_TAGPROP_TOMB_DEL} +EDIT_EDGE = {EDIT_EDGE_ADD, EDIT_EDGE_DEL, EDIT_EDGE_TOMB, EDIT_EDGE_TOMB_DEL} +EDIT_NODEDATA = {EDIT_NODEDATA_SET, EDIT_NODEDATA_DEL, EDIT_NODEDATA_TOMB, EDIT_NODEDATA_TOMB_DEL} + INDX_PROP = b'\x00\x00' INDX_TAGPROP = b'\x00\x01' @@ -5831,55 +5838,9 @@ def getStorNode(self, nid): return collections.defaultdict(dict) async def syncNodeEdits(self, offs, wait=True, compat=False, withmeta=False): - - layriden = self.iden - - async def getNexusEdits(strt): - async for nexsoffs, item in self.core.getNexusChanges(strt, wait=False): - if item[0] != layriden or item[1] != 'edits': - continue - - edits = item[2][0] - if compat: - edits = await self.core.localToRemoteEdits(edits) - - if withmeta: - yield (nexsoffs, edits, item[2][1]) - else: - yield (nexsoffs, edits) - - lastoffs = -1 - async for item in getNexusEdits(offs): - lastoffs = item[0] + async for item in self.core.syncLayerNodeEdits(self, offs, wait=wait, compat=compat, withmeta=withmeta): yield item - if not wait: - return - - async with self.getNodeEditWindow() as wind: - - # Ensure we are caught up after grabbing a window - sync = True - maxoffs = max(offs, lastoffs + 1) - - async for item in getNexusEdits(maxoffs): - maxoffs = item[0] - yield item - - async for editoffs, edits, meta in wind: - if sync: - if editoffs <= maxoffs: - continue - sync = False - - if compat: - edits = await self.core.localToRemoteEdits(edits) - - if withmeta: - yield (editoffs, edits, meta) - else: - yield (editoffs, edits) - @contextlib.asynccontextmanager async def getNodeEditWindow(self): async with await s_queue.Window.anit(maxsize=WINDOW_MAXSIZE) as wind: