From 9a6b7c69cb51c35166c4ca11ce03d353f11a1287 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 03:37:36 +0000 Subject: [PATCH] fix(beacon): add per-nodeID rate limit to Discover endpoint updates (PILOT-334) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handleDiscover now enforces a minimum 30s interval between endpoint updates from the same nodeID, preventing a single attacker-controlled nodeID from flapping its endpoint via rapid Discover messages. Mirrors the existing punch-request rate-limit pattern (SEC-026): server.go:59-60 (discoverRateMu + discoverRateLast), constant at server.go:139 (discoverMinInterval = 30s), and cleanup in reapStaleNodes() server.go:961-968. Rate-limited Discovers still receive the reply with their observed address — only the Upsert is skipped. --- server.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index 07050a5..bdc8fc8 100644 --- a/server.go +++ b/server.go @@ -56,6 +56,8 @@ type Server struct { punchRateMu sync.Mutex // protects punchSourceLast punchSourceLast map[string]time.Time // source IP → last allowed punch time lastPunchTime atomic.Int64 // UnixNano of last global punch (rate limit) + discoverRateMu sync.Mutex // protects discoverRateLast + discoverRateLast map[uint32]time.Time // nodeID → last allowed discover endpoint update // Peer mesh (gossip) beaconID uint32 @@ -116,6 +118,7 @@ const ( maxPunchPerSecond = 10 // global hard cap on punch commands per second punchPerSourceInterval = time.Second // min interval between punches from same source punchRateCleanupInterval = 5 * time.Minute // how often stale source entries are swept + discoverMinInterval = 30 * time.Second // min interval between endpoint updates from same nodeID ) func New() *Server { @@ -132,7 +135,8 @@ func NewWithPeers(beaconID uint32, peers []string) *Server { relayCh: make(chan relayJob, relayQueueSize), beaconID: beaconID, done: make(chan struct{}), - punchSourceLast: make(map[string]time.Time), + punchSourceLast: make(map[string]time.Time), + discoverRateLast: make(map[uint32]time.Time), } emptyPeers := make(map[uint32]*net.UDPAddr) s.peerNodes.Store(&emptyPeers) @@ -500,9 +504,20 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { nodeID := binary.BigEndian.Uint32(data[0:4]) - // Record this node's observed public endpoint. Sharded — no global lock. - if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap { - return // shard at capacity — drop silently + // Per-nodeID endpoint update rate limit — prevents a single + // nodeID from flapping its endpoint via rapid Discover messages. + s.discoverRateMu.Lock() + if last, ok := s.discoverRateLast[nodeID]; ok && time.Since(last) < discoverMinInterval { + s.discoverRateMu.Unlock() + // Rate-limited: skip the Upsert but still reply with the + // observed address so the node learns its public endpoint. + } else { + s.discoverRateLast[nodeID] = time.Now() + s.discoverRateMu.Unlock() + // Record this node's observed public endpoint. Sharded — no global lock. + if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap { + return // shard at capacity — drop silently + } } slog.Debug("beacon discover", "node_id", nodeID, "addr", remote) @@ -941,6 +956,16 @@ func (s *Server) reapStaleNodes() { } } s.punchRateMu.Unlock() + + // Sweep stale discover-rate entries. + s.discoverRateMu.Lock() + discoverCutoff := time.Now().Add(-discoverMinInterval * 2) + for id, last := range s.discoverRateLast { + if last.Before(discoverCutoff) { + delete(s.discoverRateLast, id) + } + } + s.discoverRateMu.Unlock() } // --- Gossip ---