diff --git a/server.go b/server.go index 07050a5..bdc8fc8 100644 --- a/server.go +++ b/server.go @@ -56,6 +56,8 @@ type Server struct { punchRateMu sync.Mutex // protects punchSourceLast punchSourceLast map[string]time.Time // source IP → last allowed punch time lastPunchTime atomic.Int64 // UnixNano of last global punch (rate limit) + discoverRateMu sync.Mutex // protects discoverRateLast + discoverRateLast map[uint32]time.Time // nodeID → last allowed discover endpoint update // Peer mesh (gossip) beaconID uint32 @@ -116,6 +118,7 @@ const ( maxPunchPerSecond = 10 // global hard cap on punch commands per second punchPerSourceInterval = time.Second // min interval between punches from same source punchRateCleanupInterval = 5 * time.Minute // how often stale source entries are swept + discoverMinInterval = 30 * time.Second // min interval between endpoint updates from same nodeID ) func New() *Server { @@ -132,7 +135,8 @@ func NewWithPeers(beaconID uint32, peers []string) *Server { relayCh: make(chan relayJob, relayQueueSize), beaconID: beaconID, done: make(chan struct{}), - punchSourceLast: make(map[string]time.Time), + punchSourceLast: make(map[string]time.Time), + discoverRateLast: make(map[uint32]time.Time), } emptyPeers := make(map[uint32]*net.UDPAddr) s.peerNodes.Store(&emptyPeers) @@ -500,9 +504,20 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { nodeID := binary.BigEndian.Uint32(data[0:4]) - // Record this node's observed public endpoint. Sharded — no global lock. - if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap { - return // shard at capacity — drop silently + // Per-nodeID endpoint update rate limit — prevents a single + // nodeID from flapping its endpoint via rapid Discover messages. + s.discoverRateMu.Lock() + if last, ok := s.discoverRateLast[nodeID]; ok && time.Since(last) < discoverMinInterval { + s.discoverRateMu.Unlock() + // Rate-limited: skip the Upsert but still reply with the + // observed address so the node learns its public endpoint. + } else { + s.discoverRateLast[nodeID] = time.Now() + s.discoverRateMu.Unlock() + // Record this node's observed public endpoint. Sharded — no global lock. + if _, atCap := s.nodes.Upsert(nodeID, remote, time.Now(), maxBeaconNodes); atCap { + return // shard at capacity — drop silently + } } slog.Debug("beacon discover", "node_id", nodeID, "addr", remote) @@ -941,6 +956,16 @@ func (s *Server) reapStaleNodes() { } } s.punchRateMu.Unlock() + + // Sweep stale discover-rate entries. + s.discoverRateMu.Lock() + discoverCutoff := time.Now().Add(-discoverMinInterval * 2) + for id, last := range s.discoverRateLast { + if last.Before(discoverCutoff) { + delete(s.discoverRateLast, id) + } + } + s.discoverRateMu.Unlock() } // --- Gossip ---