From a0156b644cdef94145312e5769910c3809bd1063 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 6 May 2026 12:15:52 -0400 Subject: [PATCH 1/6] Syntax cleanup and added a sequence number to heartbeat messages to detect duplicate deliveries. --- modules/src/protocol.js | 5 +- modules/src/state-sync.js | 157 ++++++++++++++++++++------------------ 2 files changed, 86 insertions(+), 76 deletions(-) diff --git a/modules/src/protocol.js b/modules/src/protocol.js index 6076ffed..2a644195 100644 --- a/modules/src/protocol.js +++ b/modules/src/protocol.js @@ -22,12 +22,13 @@ const OP_HEARTBEAT = "HB" const OP_GET = "GET" const OP_CLAIM = "CLAIM" -export function Heartbeat(fromSite, fromClass, hashSet, address = "") { +export function Heartbeat(fromSite, fromClass, hashSet, sequence, address = "") { let body = { version: VERSION, op: OP_HEARTBEAT, site: fromSite, sclass: fromClass, + seq: sequence, address: address, } @@ -97,7 +98,7 @@ export async function DispatchMessage(body, onHeartbeat, onGet, onClaim) { switch (body.op) { case OP_HEARTBEAT: - await onHeartbeat(body.sclass, body.site, body.hashset, body.address) + await onHeartbeat(body.sclass, body.site, body.hashset, body.seq || 0, body.address) break case OP_GET: await onGet(body.site, body.statekey) diff --git a/modules/src/state-sync.js b/modules/src/state-sync.js index 8a8643f2..fab6226e 100644 --- a/modules/src/state-sync.js +++ b/modules/src/state-sync.js @@ -40,16 +40,16 @@ export const CLASS_MEMBER = "member" const HEARTBEAT_PERIOD_SECONDS = 10 // TODO - make this much longer const HEARTBEAT_WINDOW_SECONDS = 5 -var localClass -var localId -var localAddress -var addressToUse -var initialBeacon = true -var onNewPeer -var onPeerLost -var onStateChange -var onStateRequest -var onPing +let localClass +let localId +let localAddress +let addressToUse +let initialBeacon = true +let onNewPeer +let onPeerLost +let onStateChange +let onStateRequest +let onPing // // Concepts: @@ -65,49 +65,52 @@ var onPing // RemoteState - The remote state that is intended to be synchronized FROM a peer. // -var extraTargets = [] -var connections = {} // {connectionKey: conn-record} -var peers = {} // {peerId: {connectionKey: , peerClass: , localState: {stateKey: hash}, remoteState: {stateKey: hash}}} +const extraTargets = [] +const connections = {} // {connectionKey: conn-record} +const peers = {} // {peerId: {connectionKey: , peerClass: , localState: {stateKey: hash}, remoteState: {stateKey: hash}}} +let nextSequence = 1; -const timerDelayMsec = function (floorSec) { +// +// Extend the delay by a random interval within a specificed window. This is intended to +// spread out the distribution of heartbeats over time. +// +function timerDelayMsec(floorSec) { return ( Math.floor(Math.random() * (HEARTBEAT_WINDOW_SECONDS + 1) + floorSec) * 1000 ) } -const sendHeartbeat = function (peerId) { +function sendHeartbeat(peerId) { let peer = peers[peerId] if (!!peer) { if (peer.hbTimer) { clearTimeout(peer.hbTimer) } - const sender = connections[peer.connectionKey].apiSender - const message = protocol.Heartbeat( - localId, - localClass, - peer.localState, - addressToUse, - ) - amqp.SendMessage(sender, message, {}, peer.address) + const sender = connections[peer.connectionKey]?.apiSender + if (!!sender) { + const message = protocol.Heartbeat( + localId, + localClass, + peer.localState, + nextSequence++, + addressToUse + ) + amqp.SendMessage(sender, message, {}, peer.address) + } peers[peerId].hbTimer = setTimeout( sendHeartbeat, timerDelayMsec(HEARTBEAT_PERIOD_SECONDS), - peerId, + peerId ) //Log(`SYNC: Sent Heartbeat to ${peerId}`); - //Log(message); + //Log(peer.localState); } } -const onHeartbeat = async function ( - connectionKey, - peerClass, - peerId, - hashset, - address, -) { - var localState - var remoteState +async function onHeartbeat(connectionKey, peerClass, peerId, hashset, sequence, address) { + let localState + let remoteState + let newPeer = false //Log(`SYNC: Received Heartbeat from ${peerId}`); initialBeacon = false @@ -115,6 +118,7 @@ const onHeartbeat = async function ( // If this heartbeat comes from a peer we are not tracking, consider this a new-peer. // if (!peers[peerId]) { + newPeer = true //Log(`SYNC: New Peer, id: ${peerId}`); ;[localState, remoteState] = await onNewPeer(peerId, peerClass) peers[peerId] = { @@ -124,8 +128,19 @@ const onHeartbeat = async function ( localState: localState, remoteState: remoteState, hbTimer: null, + lastSequence: undefined, } + } + + if (sequence === peers[peerId].lastSequence) { + // This is a duplicate heartbeat that arrived via a different path + return + } else { + peers[peerId].lastSequence = sequence + peers[peerId].connectionKey = connectionKey + } + if (newPeer) { // // Send a heartbeat back to the newly discovered peer with the local hash-state. // @@ -168,9 +183,7 @@ const onHeartbeat = async function ( delete peers[peerId].remoteState[key] } } catch (error) { - Log( - `Exception in state reconciliation for deletion of ${key}: ${error.message}`, - ) + Log(`Exception in state reconciliation for deletion of ${key}: ${error.message}`) } } @@ -180,14 +193,12 @@ const onHeartbeat = async function ( const sender = connections[connectionKey].apiSender for (const key of toRequestStateKeys) { try { - Log( - `SYNC: Requesting state update for key: ${key}, to: ${peers[peerId].address}`, - ) + Log(`SYNC: Requesting state update for key: ${key}, to: ${peers[peerId].address}`) const [ap, body] = await amqp.Request( sender, protocol.GetState(localId, key), {}, - peers[peerId].address, + peers[peerId].address ) if (body.statusCode == 200) { Log(`SYNC: New State: hash=${body.hash}, data=`) @@ -195,9 +206,7 @@ const onHeartbeat = async function ( await onStateChange(peerId, key, body.hash, body.data) peers[peerId].remoteState[key] = body.hash } else { - throw Error( - `Protocol error on GetState: (${body.statusCode}) ${body.statusDescription}`, - ) + throw new Error(`Protocol error on GetState: (${body.statusCode}) ${body.statusDescription}`) } } catch (error) { Log(`Exception in state reconciliation for ${key}: ${error.message}`) @@ -207,7 +216,7 @@ const onHeartbeat = async function ( } } -const sendInitialBeacon = function () { +function sendInitialBeacon() { try { if (initialBeacon && connections["net"]) { const sender = connections["net"].apiSender @@ -217,9 +226,11 @@ const sendInitialBeacon = function () { localId, localClass, null, - addressToUse, + nextSequence++, + addressToUse ) amqp.SendMessage(sender, message, {}, address) + //Log('Sent beacon...') } } } catch (e) { @@ -231,29 +242,27 @@ const sendInitialBeacon = function () { } } -const onSendable = function (connectionKey) { +function onSendable(connectionKey) { if (initialBeacon) { sendInitialBeacon() } } -const onAddress = function (connectionKey, address) { +function onAddress(connectionKey, address) { if (connectionKey == "net") { addressToUse = address } else { - Log( - `ERROR: onAddress invoked with connectionKey '${connectionKey}', expected 'net`, - ) + Log(`ERROR: onAddress invoked with connectionKey '${connectionKey}', expected 'net'`) } } -const processMessage = async function (connectionKey, body, onReply) { +async function processMessage(connectionKey, body, onReply) { try { await protocol.DispatchMessage( body, - async (sclass, site, hashset, address) => { + async (sclass, site, hashset, sequence, address) => { // onHeartbeat - await onHeartbeat(connectionKey, sclass, site, hashset, address) + await onHeartbeat(connectionKey, sclass, site, hashset, sequence, address) }, async (site, statekey) => { // onGet @@ -263,7 +272,7 @@ const processMessage = async function (connectionKey, body, onReply) { }, async (claimId, name) => { // onClaim - }, + } ) } catch (error) { Log(`Exception in sync message processing: ${error.message}`) @@ -271,24 +280,18 @@ const processMessage = async function (connectionKey, body, onReply) { } } -var processingContext = {} // peerId => {workQueue, processing} +const processingContext = {} // peerId => {workQueue, processing} -const processWorkQueue = async function (siteId) { +async function processWorkQueue(siteId) { while (processingContext[siteId].processing) { - const [connectionKey, body, onReply] = - processingContext[siteId].workQueue.shift() + const [connectionKey, body, onReply] = processingContext[siteId].workQueue.shift() await processMessage(connectionKey, body, onReply) processingContext[siteId].processing = processingContext[siteId].workQueue.length > 0 } } -const onMessage = function ( - connectionKey, - application_properties, - body, - onReply, -) { +function onMessage(connectionKey, application_properties, body, onReply) { const siteId = protocol.SourceSite(body) if (!processingContext[siteId]) { @@ -335,22 +338,22 @@ export async function AddTarget(targetAddress) { // // Add a new AMQP connection for communication. // -// backboneId : The identifier of the backbone to which this connection connects - undefined == connected to management-controller -// conn : The AMQP connection +// key : The identifier of the access point to which this connection connects - undefined == connected to management-controller +// conn : The AMQP connection // -export async function AddConnection(backboneId, conn) { - const connectionKey = backboneId || "net" +export async function AddConnection(key, conn) { + const connectionKey = key || "net" // // If someone is creating a backbone connection and the local address was not provided in the Start function, // throw an error. This is an unintended use of this module. If there is a dynamic local address, there shall // be no more than one connection in place at a time. // - if (!!backboneId && !localAddress) { + if (!!key && !localAddress) { const error = "Illegal adding of a backbone connection when no local address has been established" Log(`state-sync.AddConnection: ${error}`) - throw Error(error) + throw new Error(error) } let connRecord = { @@ -384,15 +387,21 @@ export async function AddConnection(backboneId, conn) { connRecord.apiReceiver.connectionKey = connectionKey connections[connectionKey] = connRecord + //Log(`Added connection for key ${connectionKey}`) } // // Delete an AMQP connection - This does not affect the lifecycle of known peers. // -// backboneId : The identifier (or undefined for the management-controller) of the connected backbone +// key : The identifier (or undefined for the management-controller) of the connected access point // -export async function DeleteConnection(backboneId) { - delete connections[backboneId] +export async function DeleteConnection(key) { + delete connections[key] + //console.log(`Deleted connection for key ${key}`) +} + +export async function DeletePeer(peerId) { + delete peers[peerId] } // From bbde10c0e9462c83851cddbde85dcb8fb3ccd932 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 6 May 2026 12:18:12 -0400 Subject: [PATCH 2/6] Disambiguated the global 'namespace' from argument names. --- modules/src/kube.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/src/kube.js b/modules/src/kube.js index 02198d52..9d6f52a5 100644 --- a/modules/src/kube.js +++ b/modules/src/kube.js @@ -219,10 +219,10 @@ export async function GetPods() { return list.items } -export async function getPodsByLabel(namespace, labelSelector) { +export async function getPodsByLabel(ns, labelSelector) { try { const response = await v1Api.listNamespacedPod({ - namespace: namespace, + namespace: ns, labelSelector: labelSelector }); return response.items; @@ -231,11 +231,11 @@ export async function getPodsByLabel(namespace, labelSelector) { } } -export async function waitPodsRunning(namespace, label, interval=1000, attempts=30) { +export async function waitPodsRunning(ns, label, interval=1000, attempts=30) { for (let i=0; i Date: Wed, 6 May 2026 12:19:39 -0400 Subject: [PATCH 3/6] Fixed incorrect reference to the standalone namespace for non-standalone deployments. --- components/site-controller/src/sc-main.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/components/site-controller/src/sc-main.js b/components/site-controller/src/sc-main.js index 280f8a05..daa6c25d 100644 --- a/components/site-controller/src/sc-main.js +++ b/components/site-controller/src/sc-main.js @@ -73,16 +73,14 @@ export async function Main() { Log(`Site-Id : ${site_id}`); let conn; - if ( PLATFORM == 'sk2' ) { + if (PLATFORM == 'sk2') { Log('Waiting for skupper-router pod to be Running...'); - if (!kube.waitPodsRunning(STANDALONE_NAMESPACE, 'application=skupper-router')) { + if (!kube.waitPodsRunning(kube.Namespace(), 'application=skupper-router')) { Log('Skupper-router is not running, exiting'); process.exit(1); } let certs = await GetLocalRouterCerts(); conn = amqp.OpenConnection('LocalRouter', 'skupper-router-local', '5671', 'tls', certs.ca, certs.cert, certs.key); - } else { - conn = amqp.OpenConnection('LocalRouter'); } await router.Start(conn); if (PLATFORM != 'sk2') { From 910cccaada64646153e746116aba8d7716f187a0 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 6 May 2026 12:21:25 -0400 Subject: [PATCH 4/6] Added notification of site deletion so sync heartbeats are not sent to deleted sites. --- components/management-controller/src/api-admin.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/components/management-controller/src/api-admin.js b/components/management-controller/src/api-admin.js index dd55ebac..95d2edad 100644 --- a/components/management-controller/src/api-admin.js +++ b/components/management-controller/src/api-admin.js @@ -21,7 +21,7 @@ import { IncomingForm } from 'formidable'; import { ClientFromPool, queryWithContext } from './db.js'; -import { SiteIngressChanged, LinkChanged } from './sync-management.js'; +import { SiteIngressChanged, LinkChanged, SiteDeleted } from './sync-management.js'; import { Log } from '@skupperx/modules/log' import { ManageIngressAdded, LinkAddedOrDeleted, ManageIngressDeleted } from './site-deployment-state.js'; import { ValidateAndNormalizeFields, IsValidUuid, UniquifyName } from '@skupperx/modules/util'; @@ -547,7 +547,12 @@ const deleteBackboneSite = async function(req, res) { await client.query("DELETE FROM TlsCertificates WHERE Id = $1", [row.certificate]) } } - }) + }); + + // + // Notify the state-sync module that the site is no longer here. + // + SiteDeleted(sid); res.status(returnStatus).end(); await WatchNotify('InteriorSites', sid); From 8fe02371d9a048f96463f62a9dd5f5b98785aaad Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 6 May 2026 12:22:43 -0400 Subject: [PATCH 5/6] Minor whitespace cleanup. --- components/management-controller/src/mc-main.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/management-controller/src/mc-main.js b/components/management-controller/src/mc-main.js index f10b0993..92790115 100644 --- a/components/management-controller/src/mc-main.js +++ b/components/management-controller/src/mc-main.js @@ -24,7 +24,7 @@ import yaml from 'yaml'; import fs from 'node:fs'; import rhea from 'rhea'; import * as bbLinks from './backbone-links.js'; -import * as externalVans from './external-vans.js'; +import * as externalVans from './external-vans.js'; import * as certs from './certs.js'; import * as prune from './prune.js'; import * as db from './db.js'; From 07e02c23c1b74b5ece946bc486277eeda6885dce Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 6 May 2026 12:23:42 -0400 Subject: [PATCH 6/6] Fixes #92 - Syntax cleanup. Updated the algorithm to maintain connections to all _manage_ access points rather than just one per backbone. --- .../src/backbone-links.js | 57 +++++++++---------- .../src/sync-management.js | 52 +++++++++-------- 2 files changed, 54 insertions(+), 55 deletions(-) diff --git a/components/management-controller/src/backbone-links.js b/components/management-controller/src/backbone-links.js index 06b2c700..aa5bd77d 100644 --- a/components/management-controller/src/backbone-links.js +++ b/components/management-controller/src/backbone-links.js @@ -32,19 +32,19 @@ let controller_name; let tls_ca; let tls_cert; let tls_key; -let bbConnections = {}; +let manageConnections = {}; let registrations = []; -const createConnection = async function(bbid, row) { - bbConnections[bbid] = { +async function createConnection(apid, row) { + manageConnections[apid] = { toDelete: false, host: row.hostname, port: row.port, }; Log(`Connecting to Access Point: ${row.hostname}:${row.port}`); - bbConnections[bbid].conn = OpenConnection( - `Backbone-management-${bbid}`, + manageConnections[apid].conn = OpenConnection( + `Backbone-management-${apid}`, row.hostname, row.port, 'tls', @@ -53,31 +53,26 @@ const createConnection = async function(bbid, row) { tls_key); for (const reg of registrations) { - await reg.onLinkAdded(bbid, bbConnections[bbid].conn); + await reg.onLinkAdded(apid, manageConnections[apid].conn); } } -const deleteConnection = async function(bbid) { - let conn = bbConnections[bbid].conn; +async function deleteConnection(apid) { + let conn = manageConnections[apid].conn; CloseConnection(conn); - delete bbConnections[bbid]; + delete manageConnections[apid]; for (const reg of registrations) { - await reg.onLinkDeleted(bbid); + await reg.onLinkDeleted(apid); } } -const reconcileBackboneConnections = async function() { +async function reconcileBackboneConnections() { let reschedule_delay = 30000; const client = await ClientFromPool('system'); try { await client.query('BEGIN'); - const result = await client.query( - "SELECT BackboneAccessPoints.*, InteriorSites.Backbone " + - "FROM BackboneAccessPoints " + - "JOIN InteriorSites ON InteriorSites.Id = InteriorSite " + - "JOIN Backbones ON Backbones.Id = InteriorSites.Backbone " + - "WHERE BackboneAccessPoints.Lifecycle = 'ready' and Kind = 'manage'"); + const result = await client.query("SELECT * FROM BackboneAccessPoints WHERE Lifecycle = 'ready' and Kind = 'manage'"); let db_rows = {}; for (const row of result.rows) { if (!db_rows[row.backbone]) { @@ -85,21 +80,21 @@ const reconcileBackboneConnections = async function() { } } - for (const bbid of Object.keys(bbConnections)) { - bbConnections[bbid].toDelete = true; + for (const apid of Object.keys(manageConnections)) { + manageConnections[apid].toDelete = true; } - for (const [bbid, row] of Object.entries(db_rows)) { - if (bbConnections[bbid]) { - bbConnections[bbid].toDelete = false; + for (const row of result.rows) { + if (manageConnections[row.id]) { + manageConnections[row.id].toDelete = false; } else { - await createConnection(bbid, row); + await createConnection(row.id, row); } } - for (const bbid of Object.keys(bbConnections)) { - if (bbConnections[bbid].toDelete) { - await deleteConnection(bbid); + for (const apid of Object.keys(manageConnections)) { + if (manageConnections[apid].toDelete) { + await deleteConnection(apid); } } @@ -114,7 +109,7 @@ const reconcileBackboneConnections = async function() { } } -const resolveTLSData = async function() { +async function resolveTLSData() { let reschedule_delay = 1000; const client = await ClientFromPool('system'); try { @@ -139,13 +134,13 @@ const resolveTLSData = async function() { } if (count != 3) { - throw(Error(`Unexpected set of values from TLS secret data - expected 3, got ${count}`)); + throw new Error(`Unexpected set of values from TLS secret data - expected 3, got ${count}`); } reschedule_delay = -1; setTimeout(reconcileBackboneConnections, 0); } else { - throw(Error(`Expected to find a TlsCertificate record for ready controller: ${result.rows[0].certificate}`)); + throw new Error(`Expected to find a TlsCertificate record for ready controller: ${result.rows[0].certificate}`); } } await client.query('COMMIT'); @@ -161,7 +156,7 @@ const resolveTLSData = async function() { } } -const resolveControllerRecord = async function() { +async function resolveControllerRecord() { let reschedule_delay = -1; const client = await ClientFromPool('system'); try { @@ -188,7 +183,7 @@ const resolveControllerRecord = async function() { } export async function RegisterHandler(onAdded, onDeleted) { - for (const [key, value] of Object.entries(bbConnections)) { + for (const [key, value] of Object.entries(manageConnections)) { await onAdded(key, value.conn); } diff --git a/components/management-controller/src/sync-management.js b/components/management-controller/src/sync-management.js index 706c6cd1..4de6920c 100644 --- a/components/management-controller/src/sync-management.js +++ b/components/management-controller/src/sync-management.js @@ -30,7 +30,7 @@ import { Log } from '@skupperx/modules/log' import { API_CONTROLLER_ADDRESS } from '@skupperx/modules/common' import { ClientFromPool } from './db.js'; import { LoadSecret } from '@skupperx/modules/kube' -import { CLASS_MEMBER, CLASS_BACKBONE, AddConnection, DeleteConnection, UpdateLocalState, Start as StateSyncStart, CLASS_MANAGEMENT } from '@skupperx/modules/state-sync' +import { CLASS_MEMBER, CLASS_BACKBONE, AddConnection, DeleteConnection, UpdateLocalState, Start as StateSyncStart, CLASS_MANAGEMENT, DeletePeer } from '@skupperx/modules/state-sync' import { onMewMember, StateRequest } from './sync-application.js'; import { RegisterHandler } from './backbone-links.js'; import { HashOfSecret, HashOfData } from './resource-templates.js'; @@ -78,7 +78,7 @@ export async function GetBackboneAccessPoints_TX(client, siteId, initialOnly = f //========================================================================================================================= // Backbone Site Handlers //========================================================================================================================= -const onNewBackboneSite = async function(peerId) { +async function onNewBackboneSite(peerId) { // // peerId identifies the row in InteriorSites // @@ -174,11 +174,11 @@ const onNewBackboneSite = async function(peerId) { return [localState, remoteState]; } -const onLostBackbone = async function(peerId) { +async function onLostBackbone(peerId) { // Nothing to do here - Consider adding status to the schema to indicate a stale site } -const onStateChangeBackbone = async function(peerId, stateKey, hash, data) { +async function onStateChangeBackbone(peerId, stateKey, hash, data) { // // Notes: // This will update the access point with host/port on initial site creation. @@ -215,7 +215,7 @@ const onStateChangeBackbone = async function(peerId, stateKey, hash, data) { } } -const getStateTlsBackboneSite = async function(siteId) { +async function getStateTlsBackboneSite(siteId) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -240,7 +240,7 @@ const getStateTlsBackboneSite = async function(siteId) { return [hash, data]; } -const getStateTlsMemberSite = async function(siteId) { +async function getStateTlsMemberSite(siteId) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -265,7 +265,7 @@ const getStateTlsMemberSite = async function(siteId) { return [hash, data]; } -const getStateTlsServer = async function(apid) { +async function getStateTlsServer(apid) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -290,7 +290,7 @@ const getStateTlsServer = async function(apid) { return [hash, data]; } -const getStateAccessPoint = async function(apId) { +async function getStateAccessPoint(apId) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -318,7 +318,7 @@ const getStateAccessPoint = async function(apId) { return [hash, data]; } -const getStateBackboneLink = async function(linkId) { +async function getStateBackboneLink(linkId) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -347,7 +347,7 @@ const getStateBackboneLink = async function(linkId) { return [hash, data]; } -const getStateMemberLink = async function(linkId) { +async function getStateMemberLink(linkId) { var hash = null; var data = null; const client = await ClientFromPool('system'); @@ -376,7 +376,7 @@ const getStateMemberLink = async function(linkId) { return [hash, data]; } -const onStateRequestBackbone = async function(peerId, stateKey) { // => [hash, data] +async function onStateRequestBackbone(peerId, stateKey) { var hash = null; var data = null; @@ -398,7 +398,7 @@ const onStateRequestBackbone = async function(peerId, stateKey) { // => [hash, d //========================================================================================================================= // Member Site Handlers //========================================================================================================================= -const onNewMember = async function(peerId) { +async function onNewMember(peerId) { // // peerId identifies the row in MemberSites // @@ -470,15 +470,15 @@ const onNewMember = async function(peerId) { return [localState, remoteState]; } -const onLostMember = async function(peerId) { +async function onLostMember(peerId) { // TODO } -const onStateChangeMember = async function(peerId, stateKey, hash, data) { +async function onStateChangeMember(peerId, stateKey, hash, data) { // There is no local state on a member site } -const onStateRequestMember = async function(peerId, stateKey) { +async function onStateRequestMember(peerId, stateKey) { var hash = null; var data = null; @@ -497,7 +497,7 @@ const onStateRequestMember = async function(peerId, stateKey) { //========================================================================================================================= // Sync Handlers //========================================================================================================================= -const onNewPeer = async function(peerId, peerClass) { +async function onNewPeer(peerId, peerClass) { var localState; var remoteState; peers[peerId] = { @@ -513,7 +513,7 @@ const onNewPeer = async function(peerId, peerClass) { return [localState, remoteState]; } -const onPeerLost = async function(peerId) { +async function onPeerLost(peerId) { const peer = peers[peerId]; if (!!peer) { if (peer.pClass == CLASS_MEMBER) { @@ -526,7 +526,7 @@ const onPeerLost = async function(peerId) { } } -const onStateChange = async function(peerId, stateKey, hash, data) { +async function onStateChange(peerId, stateKey, hash, data) { const peer = peers[peerId]; if (!!peer) { if (peer.pClass == CLASS_MEMBER) { @@ -537,7 +537,7 @@ const onStateChange = async function(peerId, stateKey, hash, data) { } } -const onStateRequest = async function(peerId, stateKey) { +async function onStateRequest(peerId, stateKey) { var hash = null; var data = null; const peer = peers[peerId]; @@ -551,7 +551,7 @@ const onStateRequest = async function(peerId, stateKey) { return [hash, data]; } -const onPing = async function(peerId) { +async function onPing(peerId) { const client = await ClientFromPool('system'); try { await client.query("BEGIN"); @@ -580,12 +580,12 @@ const onPing = async function(peerId) { //========================================================================================================================= // Backbone Link Handlers //========================================================================================================================= -const onLinkAdded = async function(backboneId, conn) { - await AddConnection(backboneId, conn); +async function onLinkAdded(key, conn) { + await AddConnection(key, conn); } -const onLinkDeleted = async function(backboneId) { - await DeleteConnection(backboneId); +async function onLinkDeleted(key) { + await DeleteConnection(key); } //========================================================================================================================= @@ -732,6 +732,10 @@ export async function NewIngressAvailable(siteId) { } } +export async function SiteDeleted(siteId) { + DeletePeer(siteId); +} + export async function Start() { await StateSyncStart(CLASS_MANAGEMENT, 'mc', API_CONTROLLER_ADDRESS, onNewPeer, onPeerLost, onStateChange, onStateRequest, onPing); await RegisterHandler(onLinkAdded, onLinkDeleted);