Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 89 additions & 30 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4735,35 +4735,64 @@ async def delLayrPull(self, layriden, pulliden):

await self.delActiveCoro(pulliden)

async def runLayrPush(self, layr, pdef):
url = pdef.get('url')
iden = pdef.get('iden')
# push() will refire as needed
async def syncLayerNodeEdits(self, layr, offs, wait=True, compat=False, withmeta=False):

async def push():
taskname = f'layer push: {layr.iden} {iden}'
async with await self.boss.promote(taskname, self.auth.rootuser, background=True):
async with await s_telepath.openurl(url) as proxy:
celliden = await proxy.getCellIden()
compat = celliden == self.iden
await self._pushBulkEdits(layr, proxy, pdef, compat=compat)
layriden = layr.iden

self.addActiveCoro(push, iden=iden)
async def getNexusEdits(strt):
async for nexsoffs, item in self.getNexusChanges(strt, wait=False):
if item[0] != layriden or item[1] != 'edits':
continue

async def runLayrPull(self, layr, pdef):
url = pdef.get('url')
iden = pdef.get('iden')
# pull() will refire as needed
edits = item[2][0]
if compat:
edits = await self.localToRemoteEdits(edits)

async def pull():
taskname = f'layer pull: {layr.iden} {iden}'
async with await self.boss.promote(taskname, self.auth.rootuser, background=True):
async with await s_telepath.openurl(url) as proxy:
celliden = await proxy.getCellIden()
compat = celliden == self.iden
await self._pushBulkEdits(proxy, layr, pdef, compat=compat)
if withmeta:
yield (nexsoffs, edits, item[2][1])
else:
yield (nexsoffs, edits)

self.addActiveCoro(pull, iden=iden)
lastoffs = -1
async for item in getNexusEdits(offs):
lastoffs = item[0]
yield item

if not wait:
return

async with layr.getNodeEditWindow() as wind:

# Ensure we are caught up after grabbing a window
sync = True
maxoffs = max(offs, lastoffs + 1)

async for item in getNexusEdits(maxoffs):
maxoffs = item[0]
yield item

async for editoffs, edits, meta in wind:
if sync:
if editoffs <= maxoffs:
continue
sync = False

if compat:
edits = await self.localToRemoteEdits(edits)

if withmeta:
yield (editoffs, edits, meta)
else:
yield (editoffs, edits)

async def getLayerNodeEditGenr(self, layr):

async def genr(offs, compat):
async for item in self.syncLayerNodeEdits(layr, offs, compat=compat):
yield item
await asyncio.sleep(0)

return genr

async def localToRemoteEdits(self, lnodeedits):
rnodeedits = []
Expand All @@ -4773,7 +4802,7 @@ async def localToRemoteEdits(self, lnodeedits):

redits = []
async for edit in s_coro.pause(ledits):
if edit[0] in (10, 11):
if edit[0] in s_layer.EDIT_EDGE:
verb, n2nid = edit[1]
if (n2ndef := self.getNidNdef(s_common.int64en(n2nid))) is None:
continue
Expand Down Expand Up @@ -4804,7 +4833,7 @@ async def remoteToLocalEdits(self, rnodeedits):

ledits = []
async for edit in s_coro.pause(redits):
if edit[0] in (10, 11):
if edit[0] in s_layer.EDIT_EDGE:
verb, n2ndef = edit[1]
n2nid = await self.genNdefNid(n2ndef)
ledits.append((edit[0], (verb, s_common.int64un(n2nid))))
Expand All @@ -4816,7 +4845,37 @@ async def remoteToLocalEdits(self, rnodeedits):

return lnodeedits

async def _pushBulkEdits(self, layr0, layr1, pdef, compat):
async def runLayrPush(self, layr, pdef):
url = pdef.get('url')
iden = pdef.get('iden')
# push() will refire as needed

async def push():
taskname = f'layer push: {layr.iden} {iden}'
async with await self.boss.promote(taskname, self.auth.rootuser, background=True):
async with await s_telepath.openurl(url) as proxy:
celliden = await proxy.getCellIden()
src = await self.getLayerNodeEditGenr(layr)
await self._pushBulkEdits(src, proxy, pdef, compat=(celliden != self.iden))

self.addActiveCoro(push, iden=iden)

async def runLayrPull(self, layr, pdef):
url = pdef.get('url')
iden = pdef.get('iden')
# pull() will refire as needed

async def pull():
taskname = f'layer pull: {layr.iden} {iden}'
async with await self.boss.promote(taskname, self.auth.rootuser, background=True):
async with await s_telepath.openurl(url) as proxy:
celliden = await proxy.getCellIden()
src = proxy.syncNodeEdits
await self._pushBulkEdits(src, layr, pdef, compat=(celliden != self.iden))

self.addActiveCoro(pull, iden=iden)

async def _pushBulkEdits(self, src, dst, pdef, compat):

iden = pdef.get('iden')
user = pdef.get('user')
Expand All @@ -4836,7 +4895,7 @@ async def fill():
else:
filloffs = soffs

async for item in layr0.syncNodeEdits(filloffs, compat=compat):
async for item in src(filloffs, compat=compat):
await queue.put(item)
await asyncio.sleep(0)

Expand All @@ -4860,12 +4919,12 @@ async def fill():
# prevent push->push->push nodeedits growth
alledits.extend(edits)
if len(alledits) > csize:
await layr1.saveNodeEdits(alledits, meta, compat=compat)
await dst.saveNodeEdits(alledits, meta, compat=compat)
await self.setLayrSyncOffs(iden, offs)
alledits.clear()

if alledits:
await layr1.saveNodeEdits(alledits, meta, compat=compat)
await dst.saveNodeEdits(alledits, meta, compat=compat)
await self.setLayrSyncOffs(iden, offs)

async def saveLayerNodeEdits(self, layriden, edits, meta, waitiden=None):
Expand Down
55 changes: 8 additions & 47 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ async def getIden(self):

EDIT_PROGRESS = 100 # (used by syncNodeEdits) (<etyp>, ())

EDIT_NODE = {EDIT_NODE_ADD, EDIT_NODE_DEL, EDIT_NODE_TOMB, EDIT_NODE_TOMB_DEL}
EDIT_PROP = {EDIT_PROP_SET, EDIT_PROP_DEL, EDIT_PROP_TOMB, EDIT_PROP_TOMB_DEL}
EDIT_TAG = {EDIT_TAG_SET, EDIT_TAG_DEL, EDIT_TAG_TOMB, EDIT_TAG_TOMB_DEL}
EDIT_TAGPROP = {EDIT_TAGPROP_SET, EDIT_TAGPROP_DEL, EDIT_TAGPROP_TOMB, EDIT_TAGPROP_TOMB_DEL}
EDIT_EDGE = {EDIT_EDGE_ADD, EDIT_EDGE_DEL, EDIT_EDGE_TOMB, EDIT_EDGE_TOMB_DEL}
EDIT_NODEDATA = {EDIT_NODEDATA_SET, EDIT_NODEDATA_DEL, EDIT_NODEDATA_TOMB, EDIT_NODEDATA_TOMB_DEL}

INDX_PROP = b'\x00\x00'
INDX_TAGPROP = b'\x00\x01'

Expand Down Expand Up @@ -5831,55 +5838,9 @@ def getStorNode(self, nid):
return collections.defaultdict(dict)

async def syncNodeEdits(self, offs, wait=True, compat=False, withmeta=False):

layriden = self.iden

async def getNexusEdits(strt):
async for nexsoffs, item in self.core.getNexusChanges(strt, wait=False):
if item[0] != layriden or item[1] != 'edits':
continue

edits = item[2][0]
if compat:
edits = await self.core.localToRemoteEdits(edits)

if withmeta:
yield (nexsoffs, edits, item[2][1])
else:
yield (nexsoffs, edits)

lastoffs = -1
async for item in getNexusEdits(offs):
lastoffs = item[0]
async for item in self.core.syncLayerNodeEdits(self, offs, wait=wait, compat=compat, withmeta=withmeta):
yield item

if not wait:
return

async with self.getNodeEditWindow() as wind:

# Ensure we are caught up after grabbing a window
sync = True
maxoffs = max(offs, lastoffs + 1)

async for item in getNexusEdits(maxoffs):
maxoffs = item[0]
yield item

async for editoffs, edits, meta in wind:
if sync:
if editoffs <= maxoffs:
continue
sync = False

if compat:
edits = await self.core.localToRemoteEdits(edits)

if withmeta:
yield (editoffs, edits, meta)
else:
yield (editoffs, edits)

@contextlib.asynccontextmanager
async def getNodeEditWindow(self):
async with await s_queue.Window.anit(maxsize=WINDOW_MAXSIZE) as wind:
Expand Down