From 4a4802f7a9806a828baca724cc765922fac1a029 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 06:58:36 +0000 Subject: [PATCH] fix: defer saveTrust I/O outside hm.mu.Lock via dirty-signal goroutine (PILOT-325) saveTrust() calls AtomicWrite (temp+rename+fsync) which is I/O-bound. Previously called inside hm.mu.Lock on every trust mutation, serializing all handshake operations behind disk latency. Fix: add dirty (chan struct{}, buf 1) + done channels to Manager, start a drainSaves goroutine that persists trust state asynchronously, and replace all in-lock hm.saveTrust() with non-blocking hm.markDirty(). saveTrust now takes its own RLock internally. Net: under sustained handshake load (e.g. joining a fresh network), trust grants no longer block concurrent handshake operations. --- handshake.go | 70 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/handshake.go b/handshake.go index c4dc23d..3ef722e 100644 --- a/handshake.go +++ b/handshake.go @@ -89,6 +89,8 @@ type Manager struct { wg sync.WaitGroup // tracks background RPCs for clean shutdown reapStop chan struct{} // signals replay reaper to stop stopOnce sync.Once // ensures reapStop is closed only once + dirty chan struct{} // buffered 1: non-blocking signal to drain goroutine + done chan struct{} // closed in Stop to signal drain goroutine exit // Replay protection replayMu sync.Mutex @@ -113,6 +115,8 @@ func NewManager(rt Runtime) *Manager { revoked: make(map[uint32]time.Time), replaySet: make(map[[32]byte]time.Time), trustWaiters: make(map[uint32][]chan struct{}), + dirty: make(chan struct{}, 1), + done: make(chan struct{}), } if path := rt.IdentityPath(); path != "" { @@ -121,6 +125,8 @@ func NewManager(rt Runtime) *Manager { hm.loadTrust() } + go hm.drainSaves() + return hm } @@ -133,6 +139,8 @@ func (hm *Manager) Stop() { hm.stopping = true hm.mu.Unlock() + close(hm.done) + hm.stopOnce.Do(func() { if hm.reapStop != nil { close(hm.reapStop) @@ -297,6 +305,9 @@ func (hm *Manager) saveTrust() { return } + hm.mu.RLock() + defer hm.mu.RUnlock() + snap := trustSnapshot{} for _, r := range hm.trusted { snap.Trusted = append(snap.Trusted, trustSnapshotEntry{ @@ -343,6 +354,29 @@ func (hm *Manager) saveTrust() { slog.Debug("trust state saved", "peers", len(hm.trusted), "pending", len(hm.pending), "revoked", len(snap.Revoked)) } +// drainSaves drains the dirty channel and persists trust state +// asynchronously, so that AtomicWrite (including fsync) does not +// block other handshake operations holding hm.mu. +func (hm *Manager) drainSaves() { + for { + select { + case <-hm.dirty: + hm.saveTrust() + case <-hm.done: + return + } + } +} + +// markDirty signals the drain goroutine that trust state needs +// persistence. Caller MUST hold hm.mu (write-locked). +func (hm *Manager) markDirty() { + select { + case hm.dirty <- struct{}{}: + default: + } +} + func (hm *Manager) loadTrust() { if hm.storePath == "" { return @@ -643,7 +677,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": peerNodeID, "state": "granted", "reason": "mutual", }) - hm.saveTrust() + hm.markDirty() hm.sendAcceptLocked(peerNodeID) // Report trust to registry if reg := hm.rt.Registry(); reg != nil { @@ -670,7 +704,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": peerNodeID, "state": "granted", "reason": "same_network", }) - hm.saveTrust() + hm.markDirty() hm.sendAcceptLocked(peerNodeID) // Report trust to registry if reg := hm.rt.Registry(); reg != nil { @@ -702,7 +736,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": peerNodeID, "state": "granted", "reason": "trusted_agent", "agent": name, }) - hm.saveTrust() + hm.markDirty() hm.sendAcceptLocked(peerNodeID) if reg := hm.rt.Registry(); reg != nil { selfID := hm.rt.NodeID() @@ -727,7 +761,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": peerNodeID, "state": "granted", "reason": "auto_approve", }) - hm.saveTrust() + hm.markDirty() hm.sendAcceptLocked(peerNodeID) if reg := hm.rt.Registry(); reg != nil { selfID := hm.rt.NodeID() @@ -747,7 +781,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis Justification: msg.Justification, ReceivedAt: time.Now(), } - hm.saveTrust() + hm.markDirty() slog.Info("handshake request pending approval", "peer_node_id", peerNodeID) hm.rt.PublishEvent("handshake.pending", map[string]interface{}{ "peer_node_id": peerNodeID, "justification": msg.Justification, @@ -779,7 +813,7 @@ func (hm *Manager) handleAccept(msg *HandshakeMsg) { ApprovedAt: time.Now(), Mutual: true, }) - hm.saveTrust() + hm.markDirty() // Report trust to registry if reg := hm.rt.Registry(); reg != nil { @@ -882,7 +916,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string Mutual: true, }) slog.Info("mutual relayed handshake auto-approved", "peer_node_id", fromNodeID) - hm.saveTrust() + hm.markDirty() // Respond via registry and backfill public key if reg := hm.rt.Registry(); reg != nil { nodeID, peerID := hm.rt.NodeID(), fromNodeID @@ -903,7 +937,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string Network: hm.sharedNetwork(fromNodeID), }) slog.Info("same network relayed handshake auto-approved", "peer_node_id", fromNodeID) - hm.saveTrust() + hm.markDirty() if reg := hm.rt.Registry(); reg != nil { nodeID, peerID := hm.rt.NodeID(), fromNodeID sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID)) @@ -932,7 +966,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": fromNodeID, "state": "granted", "reason": "trusted_agent_relayed", "agent": name, }) - hm.saveTrust() + hm.markDirty() if reg := hm.rt.Registry(); reg != nil { nodeID, peerID := hm.rt.NodeID(), fromNodeID sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID)) @@ -959,7 +993,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string hm.rt.PublishEvent("trust.changed", map[string]interface{}{ "peer_node_id": fromNodeID, "state": "granted", "reason": "auto_approve_relayed", }) - hm.saveTrust() + hm.markDirty() if reg := hm.rt.Registry(); reg != nil { nodeID, peerID := hm.rt.NodeID(), fromNodeID sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID)) @@ -978,7 +1012,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string Justification: justification, ReceivedAt: time.Now(), } - hm.saveTrust() + hm.markDirty() slog.Info("relayed handshake request pending approval", "from_node_id", fromNodeID, "justification", justification) } @@ -1017,7 +1051,7 @@ func (hm *Manager) processRelayedApproval(fromNodeID uint32) { ApprovedAt: time.Now(), Mutual: true, }) - hm.saveTrust() + hm.markDirty() slog.Info("trust established via relayed approval", "peer_node_id", fromNodeID) // Backfill public key from registry @@ -1050,7 +1084,7 @@ func (hm *Manager) backfillPeerKey(peerNodeID uint32) { defer hm.mu.Unlock() if rec, ok := hm.trusted[peerNodeID]; ok && rec.PublicKey == "" { rec.PublicKey = pubKeyB64 - hm.saveTrust() + hm.markDirty() slog.Debug("backfilled peer public key", "peer_node_id", peerNodeID) } } @@ -1083,7 +1117,7 @@ func (hm *Manager) ApproveHandshake(peerNodeID uint32) error { PublicKey: req.PublicKey, ApprovedAt: time.Now(), }) - hm.saveTrust() + hm.markDirty() hm.mu.Unlock() slog.Info("handshake approved", "peer_node_id", peerNodeID) @@ -1114,7 +1148,7 @@ func (hm *Manager) ApproveHandshake(peerNodeID uint32) error { func (hm *Manager) RejectHandshake(peerNodeID uint32, reason string) error { hm.mu.Lock() delete(hm.pending, peerNodeID) - hm.saveTrust() + hm.markDirty() hm.mu.Unlock() slog.Info("handshake rejected", "peer_node_id", peerNodeID, "reason", reason) @@ -1163,7 +1197,7 @@ func (hm *Manager) RevokeTrust(peerNodeID uint32) error { // the normal poll cycle many times over. hm.revoked[peerNodeID] = time.Now().Add(5 * time.Minute) if wasTrusted || wasPending { - hm.saveTrust() + hm.markDirty() } hm.mu.Unlock() @@ -1224,7 +1258,7 @@ func (hm *Manager) handleRevokeMsg(msg *HandshakeMsg) { delete(hm.pending, peerNodeID) delete(hm.outgoing, peerNodeID) if wasTrusted || wasPending { - hm.saveTrust() + hm.markDirty() } hm.mu.Unlock() @@ -1403,7 +1437,7 @@ func (hm *Manager) reapStalePending() { for _, id := range stale { delete(hm.pending, id) } - hm.saveTrust() + hm.markDirty() slog.Info("reaped stale pending handshakes", "count", len(stale), "ttl_hours", int(pendingHandshakeTTL/time.Hour),