diff --git a/synapse/lib/aha.py b/synapse/lib/aha.py index 95f4c8a11d4..9e64edb26e9 100644 --- a/synapse/lib/aha.py +++ b/synapse/lib/aha.py @@ -61,6 +61,9 @@ } provSvcSchema = s_config.getJsValidator(_provSvcSchema) +STATE_LEAD = 0 # lead... +STATE_FOLLOW = 1 # follow... +STATE_INVALID = 2 # or get out of the way! class AhaProvisionServiceV1(s_httpapi.Handler): @@ -367,6 +370,15 @@ async def clearAhaUserEnrolls(self): ''' return await self.cell.clearAhaUserEnrolls() + async def mayLeadTerm(self, iden, name, term, nexs): + network = self.cell.conf.get('aha:network') + # If we could register the service, we can lead the service... + await self._reqUserAllowed(('aha', 'service', 'add', network, name)) + return await self.cell.mayLeadTerm(iden, name, term, nexs) + + async def getLeadTerm(self, iden): + return await self.cell.getLeadTerm(iden) + @s_cell.adminapi() async def clearAhaClones(self): ''' @@ -579,6 +591,7 @@ async def _initCellBoot(self): async def initServiceStorage(self): self.features['callpeers'] = 1 + self.features['leadterms'] = 1 dirn = s_common.gendir(self.dirn, 'slabs', 'jsonstor') @@ -596,6 +609,8 @@ async def fini(): self.slab.initdb('aha:provs') self.slab.initdb('aha:enrolls') + self.slab.initdb('aha:leadterms') + self.slab.initdb('aha:clones') self.slab.initdb('aha:servers') @@ -1407,6 +1422,94 @@ async def signUserCsr(self, csrtext, signas=None): return self.certdir._certToByts(cert).decode() + async def getLeadTerm(self, iden): + ''' + Get the current leader term for the specified service cluster. + ''' + lkey = s_common.uhex(termdef.get('iden')) + byts = self.slab.get(b'\x00' + lkey, db='aha:leadterms') + if byts is not None: + return s_msgpack.un(byts) + + @s_cell.from_leader + async def setLeadTerm(self, termdef): + ''' + Set the current leader term for the specified service cluster. + ''' + termdef['time'] = s_common.now() + s_schema.reqValidLeadTerm(termdef) + return await self._push('aha:lead:set', termdef) + + @s_nexus.Pusher.onPush('aha:lead:set') + async def _setLeadTerm(self, termdef): + s_schema.reqValidLeadTerm(termdef) + lkey = s_common.uhex(termdef.get('iden')) + self.slab.put(b'\x00' + lkey, s_msgpack.en(termdef), db='aha:leadterms') + + @s_cell.from_leader + async def nextLeadTerm(self, iden, name, nexs): + ''' + Force a change in leadership for the given cluster iden. + ''' + async with self.nexslock: + + term = 0 + leadterm = self.getLeadTerm(iden) + if leadterm is not None: + term = leadterm.get('term') + 1 + + newterm = { + 'iden': iden, + 'name': name, + 'term': term, + 'nexs': nexs, + } + return await self.setLeadTerm(leadterm) + + @s_cell.from_leader + async def mayLeadTerm(self, iden, name, term, nexs): + ''' + Determine if the caller is still the valid leader. + Returns: (, ) tuple. + + The may be one of the following values: + STATE_LEAD: you are the leader + STATE_FOLLOW: you must become a follower + STATE_INVALID: you are divergent and must reprovision. + ''' + + # we hold the nexus lock so this may only be run on the leader + async with self.nexslock: + + leadterm = await self.getLeadTerm(iden) + if leadterm is None: + leadterm = { + 'iden': iden, + 'name': ahaname, + 'term': term, + 'nexs': nexs, + } + await self.setLeadTerm(leadterm) + return (STATE_LEAD, leadterm) + + # if we were the last known leader, we can jump right in + if leadterm.get('name') == ahaname: + return (STATE_LEAD, leadterm) + + # has someone else taken the lead in our absense? + if leadterm.get('term') > term: + + # if someone else was promoted and their nexs index + # was less than ours at the time, we are divergent :( + if leadterm.get('nexs') < nexs: + return (STATE_INVALID, leadterm) + + # if they took over but had an equal nexs index we can + # become a follower... + return (STATE_FOLLOW, leadterm) + + return (STATE_INVALID, leadterm) + async def getAhaUrls(self, user='root'): # for backward compat... diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 3d36926b61b..c2c565abb76 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -1286,6 +1286,7 @@ async def fini(): # for runtime cell configuration values self.slab.initdb('cell:conf') + self.slab.initdb('cell:meta') self._sslctx_cache = s_cache.FixedCache(self._makeCachedSslCtx, size=SSLCTX_CACHE_SIZE) @@ -1371,6 +1372,17 @@ async def fini(): # phase 5 - service networking await self.initServiceNetwork() + def getCellMeta(self, name, defv=None): + byts = self.slab.get(name.encode(), s_msgpack.en(valu), db='cell:meta') + if byts is not None: + return s_msgpack.un(byts) + return defv + + def setCellMeta(self, name, valu): + # NOTE: these changes are NOT nexus enabled! + self.slab.put(name.encode(), s_msgpack.en(valu), db='cell:meta') + return valu + async def _storCellHiveMigration(self): logger.warning(f'migrating Cell ({self.getCellType()}) info out of hive') @@ -1738,9 +1750,72 @@ async def _runSysctlLoop(self): await self.waitfini(self.SYSCTL_CHECK_FREQ) + async def _askAhaToLead(self, proxy): + + while not self.isfini: + + try: + proxy = self.ahaclient.proxy(timeout=4) + except Exception as e: + logger.error(f'Error getting AHA proxy to check leadership: {e}') + continue + + if not proxy._hasTeleFeat('leadterms'): + logger.warning('AHA Server does not support tracking leadership terms. Please update!') + return + + async with self.nexslock: + + name = self.conf.get('aha:name') + nexs = self.getNexsIndx() + term = self.getCellMeta('aha:term', 0) + + state, leadterm = await proxy.mayLeadTerm(self.iden, name, term, nexs) + + realterm = leadterm.get('term') + if realterm > term or self.getCellMeta('aha:term') is None: + self.setCellMeta('aha:term', realterm) + + if state == s_aha.STATE_LEAD: + await self.setMirror(None) + return + + if state == s_aha.STATE_FOLLOW: + lead = leadterm.get('name') + user = self.conf.get('aha:user', 'root') + await self.setMirror(f'aha://{user}@{lead}') + return + + if state == s_aha.STATE_INVALID: + await self._termStateInvalid() + return + + async def _termStateInvalid(self): + # hook point for enterprise behavior + logger.error('Leadership schism detected!') + logger.error('See: FIXME DOCS URL') + # TODO: should we allow an ENV var based override? + # TODO: should we find the leader and re-provision? + await self.fini() + + def getAhaRegistry(self): + + urls = self.conf.get('aha:registry') + if isinstance(urls, str): + return (urls,) + + if isinstance(urls, list): + urls = tuple(urls) + + return urls + + def setAhaRegistry(self, urls): + self.modCellConf({'aha:registry': urls}) + self.ahaclient.setBootUrls(urls) + async def _initAhaRegistry(self): - ahaurls = self.conf.get('aha:registry') + ahaurls = self.getAhaRegistry() if ahaurls is not None: await s_telepath.addAhaUrl(ahaurls) @@ -1748,21 +1823,22 @@ async def _initAhaRegistry(self): await self.ahaclient.fini() async def onlink(proxy): + ahauser = self.conf.get('aha:user', 'root') + + oldurls = self.getAhaRegistry() newurls = await proxy.getAhaUrls(user=ahauser) - oldurls = self.conf.get('aha:registry') - if isinstance(oldurls, str): - oldurls = (oldurls,) - elif isinstance(oldurls, list): - oldurls = tuple(oldurls) + if newurls and newurls != oldurls: if oldurls[0].startswith('tcp://'): s_common.deprecated('aha:registry: tcp:// client values.') logger.warning('tcp:// based aha:registry options are deprecated and will be removed in Synapse v3.0.0') return - self.modCellConf({'aha:registry': newurls}) - self.ahaclient.setBootUrls(newurls) + self.setAhaRegistry(newurls) + + if self.conf.get('mirror') is None: + await self._askAhaToLead() self.ahaclient = await s_telepath.Client.anit(ahaurls, onlink=onlink) self.onfini(self.ahaclient) @@ -1941,13 +2017,32 @@ async def initServiceStorage(self): pass async def initNexusSubsystem(self): - if self.cellparent is None: - await self.nexsroot.recover() - await self.nexsroot.startup() - await self.setCellActive(self.conf.get('mirror') is None) - if self.minfree is not None: - self.schedCoro(self._runFreeSpaceLoop()) + if self.cellparent is not None: + return + + # If we are AHA enabled and think we're the leader, + # we must check before proceeding... + # TODO: or be readonly or a follower of None or something? + mirror = self.conf.get('mirror') + if self.ahaclient is not None: + self._ask + nexs = self.getNexsIndx() + + await self.nexsroot.recover() + await self.nexsroot.startup() + + # retrieve this again in case it changed + mirror = self.conf.get('mirror') + await self.setCellActive(mirror is None) + + if self.minfree is not None: + self.schedCoro(self._runFreeSpaceLoop()) + + async def setMirror(self, url): + self.conf.update({'mirror': url}) + await self.setCellActive(url is None) + await self.nexsroot.startup() async def _bindDmonListen(self): @@ -2174,14 +2269,19 @@ async def promote(self, graceful=False): logger.warning(f'PROMOTION: Completed leadership handoff to {myurl}{_dispname}') return - logger.debug(f'PROMOTION: Clearing mirror configuration{_dispname}.') - self.modCellConf({'mirror': None}) + if self.ahaclient: - logger.debug(f'PROMOTION: Promoting the nexus root{_dispname}.') - await self.nexsroot.promote() + name = self.conf.get('aha:name') + proxy = await self.ahaclient.proxy(timeout=6) - logger.debug(f'PROMOTION: Setting the cell as active{_dispname}.') - await self.setCellActive(True) + logger.warning(f'PROMOTION: Updating AHA Leadership Term') + async with self.nexslock: + nexs = await self.getNexsIndx() + leadterm = await proxy.nextLeadTerm(self.iden, name, nexs) + self.setCellMeta('aha:term', leadterm.get('term')) + + logger.debug(f'PROMOTION: Clearing mirror configuration{_dispname}.') + await self.setMirror(None) logger.warning(f'PROMOTION: Finished leadership promotion{_dispname}.') @@ -2226,14 +2326,8 @@ async def handoff(self, turl, timeout=30): logger.debug(f'HANDOFF: Mirror has caught up to the current leader, performing promotion{_dispname}.') await cell.promote() - logger.debug(f'HANDOFF: Setting the service as inactive{_dispname}.') - await self.setCellActive(False) - logger.debug(f'HANDOFF: Configuring service to sync from new leader{_dispname}.') - self.modCellConf({'mirror': turl}) - - logger.debug(f'HANDOFF: Restarting the nexus{_dispname}.') - await self.nexsroot.startup() + await self.setMirror(turl) logger.debug(f'HANDOFF: Released nexus lock{_dispname}.') diff --git a/synapse/lib/schemas.py b/synapse/lib/schemas.py index 85fbb528b77..54025c9e534 100644 --- a/synapse/lib/schemas.py +++ b/synapse/lib/schemas.py @@ -3,6 +3,19 @@ import synapse.lib.msgpack as s_msgpack +leadTermSchema = { + 'type': 'object', + 'properties': { + 'iden': {'type': 'string', 'pattern': s_config.re_iden}, + 'name': {'type': 'string'}, + 'term': {'type': 'number'}, + 'nexs': {'type': 'number'}, + 'time': {'type': 'number'}, + }, + 'required': ['iden', 'term', 'nexs', 'name', 'time'], +} +reqValidLeadTerm = s_config.getJsValidator(leadTermSchema) + easyPermSchema = { 'type': 'object', 'properties': {