Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Manager struct {
wg sync.WaitGroup // tracks background RPCs for clean shutdown
reapStop chan struct{} // signals replay reaper to stop
stopOnce sync.Once // ensures reapStop is closed only once
dirty chan struct{} // buffered 1: non-blocking signal to drain goroutine
done chan struct{} // closed in Stop to signal drain goroutine exit

// Replay protection
replayMu sync.Mutex
Expand All @@ -113,6 +115,8 @@ func NewManager(rt Runtime) *Manager {
revoked: make(map[uint32]time.Time),
replaySet: make(map[[32]byte]time.Time),
trustWaiters: make(map[uint32][]chan struct{}),
dirty: make(chan struct{}, 1),
done: make(chan struct{}),
}

if path := rt.IdentityPath(); path != "" {
Expand All @@ -121,6 +125,8 @@ func NewManager(rt Runtime) *Manager {
hm.loadTrust()
}

go hm.drainSaves()

return hm
}

Expand All @@ -133,6 +139,8 @@ func (hm *Manager) Stop() {
hm.stopping = true
hm.mu.Unlock()

close(hm.done)

hm.stopOnce.Do(func() {
if hm.reapStop != nil {
close(hm.reapStop)
Expand Down Expand Up @@ -297,6 +305,9 @@ func (hm *Manager) saveTrust() {
return
}

hm.mu.RLock()
defer hm.mu.RUnlock()

snap := trustSnapshot{}
for _, r := range hm.trusted {
snap.Trusted = append(snap.Trusted, trustSnapshotEntry{
Expand Down Expand Up @@ -343,6 +354,29 @@ func (hm *Manager) saveTrust() {
slog.Debug("trust state saved", "peers", len(hm.trusted), "pending", len(hm.pending), "revoked", len(snap.Revoked))
}

// drainSaves drains the dirty channel and persists trust state
// asynchronously, so that AtomicWrite (including fsync) does not
// block other handshake operations holding hm.mu.
func (hm *Manager) drainSaves() {
for {
select {
case <-hm.dirty:
hm.saveTrust()
case <-hm.done:
return
}
}
}

// markDirty signals the drain goroutine that trust state needs
// persistence. Caller MUST hold hm.mu (write-locked).
func (hm *Manager) markDirty() {
select {
case hm.dirty <- struct{}{}:
default:
}
}

func (hm *Manager) loadTrust() {
if hm.storePath == "" {
return
Expand Down Expand Up @@ -643,7 +677,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": peerNodeID, "state": "granted", "reason": "mutual",
})
hm.saveTrust()
hm.markDirty()
hm.sendAcceptLocked(peerNodeID)
// Report trust to registry
if reg := hm.rt.Registry(); reg != nil {
Expand All @@ -670,7 +704,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": peerNodeID, "state": "granted", "reason": "same_network",
})
hm.saveTrust()
hm.markDirty()
hm.sendAcceptLocked(peerNodeID)
// Report trust to registry
if reg := hm.rt.Registry(); reg != nil {
Expand Down Expand Up @@ -702,7 +736,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": peerNodeID, "state": "granted", "reason": "trusted_agent", "agent": name,
})
hm.saveTrust()
hm.markDirty()
hm.sendAcceptLocked(peerNodeID)
if reg := hm.rt.Registry(); reg != nil {
selfID := hm.rt.NodeID()
Expand All @@ -727,7 +761,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": peerNodeID, "state": "granted", "reason": "auto_approve",
})
hm.saveTrust()
hm.markDirty()
hm.sendAcceptLocked(peerNodeID)
if reg := hm.rt.Registry(); reg != nil {
selfID := hm.rt.NodeID()
Expand All @@ -747,7 +781,7 @@ func (hm *Manager) handleRequest(stream coreapi.Stream, msg *HandshakeMsg, regis
Justification: msg.Justification,
ReceivedAt: time.Now(),
}
hm.saveTrust()
hm.markDirty()
slog.Info("handshake request pending approval", "peer_node_id", peerNodeID)
hm.rt.PublishEvent("handshake.pending", map[string]interface{}{
"peer_node_id": peerNodeID, "justification": msg.Justification,
Expand Down Expand Up @@ -779,7 +813,7 @@ func (hm *Manager) handleAccept(msg *HandshakeMsg) {
ApprovedAt: time.Now(),
Mutual: true,
})
hm.saveTrust()
hm.markDirty()

// Report trust to registry
if reg := hm.rt.Registry(); reg != nil {
Expand Down Expand Up @@ -882,7 +916,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string
Mutual: true,
})
slog.Info("mutual relayed handshake auto-approved", "peer_node_id", fromNodeID)
hm.saveTrust()
hm.markDirty()
// Respond via registry and backfill public key
if reg := hm.rt.Registry(); reg != nil {
nodeID, peerID := hm.rt.NodeID(), fromNodeID
Expand All @@ -903,7 +937,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string
Network: hm.sharedNetwork(fromNodeID),
})
slog.Info("same network relayed handshake auto-approved", "peer_node_id", fromNodeID)
hm.saveTrust()
hm.markDirty()
if reg := hm.rt.Registry(); reg != nil {
nodeID, peerID := hm.rt.NodeID(), fromNodeID
sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID))
Expand Down Expand Up @@ -932,7 +966,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": fromNodeID, "state": "granted", "reason": "trusted_agent_relayed", "agent": name,
})
hm.saveTrust()
hm.markDirty()
if reg := hm.rt.Registry(); reg != nil {
nodeID, peerID := hm.rt.NodeID(), fromNodeID
sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID))
Expand All @@ -959,7 +993,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string
hm.rt.PublishEvent("trust.changed", map[string]interface{}{
"peer_node_id": fromNodeID, "state": "granted", "reason": "auto_approve_relayed",
})
hm.saveTrust()
hm.markDirty()
if reg := hm.rt.Registry(); reg != nil {
nodeID, peerID := hm.rt.NodeID(), fromNodeID
sig := hm.signHandshakeChallenge(fmt.Sprintf("respond:%d:%d", nodeID, peerID))
Expand All @@ -978,7 +1012,7 @@ func (hm *Manager) processRelayedRequest(fromNodeID uint32, justification string
Justification: justification,
ReceivedAt: time.Now(),
}
hm.saveTrust()
hm.markDirty()
slog.Info("relayed handshake request pending approval", "from_node_id", fromNodeID, "justification", justification)
}

Expand Down Expand Up @@ -1017,7 +1051,7 @@ func (hm *Manager) processRelayedApproval(fromNodeID uint32) {
ApprovedAt: time.Now(),
Mutual: true,
})
hm.saveTrust()
hm.markDirty()
slog.Info("trust established via relayed approval", "peer_node_id", fromNodeID)

// Backfill public key from registry
Expand Down Expand Up @@ -1050,7 +1084,7 @@ func (hm *Manager) backfillPeerKey(peerNodeID uint32) {
defer hm.mu.Unlock()
if rec, ok := hm.trusted[peerNodeID]; ok && rec.PublicKey == "" {
rec.PublicKey = pubKeyB64
hm.saveTrust()
hm.markDirty()
slog.Debug("backfilled peer public key", "peer_node_id", peerNodeID)
}
}
Expand Down Expand Up @@ -1083,7 +1117,7 @@ func (hm *Manager) ApproveHandshake(peerNodeID uint32) error {
PublicKey: req.PublicKey,
ApprovedAt: time.Now(),
})
hm.saveTrust()
hm.markDirty()
hm.mu.Unlock()

slog.Info("handshake approved", "peer_node_id", peerNodeID)
Expand Down Expand Up @@ -1114,7 +1148,7 @@ func (hm *Manager) ApproveHandshake(peerNodeID uint32) error {
func (hm *Manager) RejectHandshake(peerNodeID uint32, reason string) error {
hm.mu.Lock()
delete(hm.pending, peerNodeID)
hm.saveTrust()
hm.markDirty()
hm.mu.Unlock()

slog.Info("handshake rejected", "peer_node_id", peerNodeID, "reason", reason)
Expand Down Expand Up @@ -1163,7 +1197,7 @@ func (hm *Manager) RevokeTrust(peerNodeID uint32) error {
// the normal poll cycle many times over.
hm.revoked[peerNodeID] = time.Now().Add(5 * time.Minute)
if wasTrusted || wasPending {
hm.saveTrust()
hm.markDirty()
}
hm.mu.Unlock()

Expand Down Expand Up @@ -1224,7 +1258,7 @@ func (hm *Manager) handleRevokeMsg(msg *HandshakeMsg) {
delete(hm.pending, peerNodeID)
delete(hm.outgoing, peerNodeID)
if wasTrusted || wasPending {
hm.saveTrust()
hm.markDirty()
}
hm.mu.Unlock()

Expand Down Expand Up @@ -1403,7 +1437,7 @@ func (hm *Manager) reapStalePending() {
for _, id := range stale {
delete(hm.pending, id)
}
hm.saveTrust()
hm.markDirty()
slog.Info("reaped stale pending handshakes",
"count", len(stale),
"ttl_hours", int(pendingHandshakeTTL/time.Hour),
Expand Down
Loading