diff --git a/server.go b/server.go index 07050a5..fa68efa 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,10 @@ type Server struct { punchSourceLast map[string]time.Time // source IP → last allowed punch time lastPunchTime atomic.Int64 // UnixNano of last global punch (rate limit) + // Per-source relay rate limiters (SEC-037). + relayRateMu sync.Mutex // protects relaySourceCount + relaySourceCount map[uint32]*relaySourceWindow // senderID → sliding window state + // Peer mesh (gossip) beaconID uint32 peers []*net.UDPAddr // peer beacon addresses (slow path, peerMu) @@ -118,6 +122,21 @@ const ( punchRateCleanupInterval = 5 * time.Minute // how often stale source entries are swept ) +// relaySourceWindow tracks a single source's relay count in the current 1-second window. +type relaySourceWindow struct { + windowStart int64 // UnixNano of current 1-second window start + count uint32 // relay count in current window +} + +// maxRelaysPerSourcePerSecond caps relays a single sender can push through +// dispatchRelay per second. Set generously (1000 relays/sec) — a legitimate +// source behind NAT may relay traffic for many agents, but a DoS source +// flooding relays to a known target can saturate the 524288-deep relayCh +// and cause queue-full drops for everyone. This cap prevents one source +// from consuming more than ~0.2% of total queue capacity per second. +const maxRelaysPerSourcePerSecond = 1000 +const relaySourceCleanupInterval = 5 * time.Minute + func New() *Server { return NewWithPeers(0, nil) } @@ -132,7 +151,8 @@ func NewWithPeers(beaconID uint32, peers []string) *Server { relayCh: make(chan relayJob, relayQueueSize), beaconID: beaconID, done: make(chan struct{}), - punchSourceLast: make(map[string]time.Time), + punchSourceLast: make(map[string]time.Time), + relaySourceCount: make(map[uint32]*relaySourceWindow), } emptyPeers := make(map[uint32]*net.UDPAddr) s.peerNodes.Store(&emptyPeers) @@ -653,6 +673,29 @@ func (s *Server) dispatchRelay(data []byte) { } } + // Per-source rate limit: prevent one sender from flooding the relay + // queue and squeezing out legitimate traffic. A DoS source targeting + // a known destination can saturate the 524288-deep relayCh at rates + // far above normal — this cap gives each source a fixed share. + now := time.Now().UnixNano() + s.relayRateMu.Lock() + w, ok := s.relaySourceCount[senderID] + if !ok || now-w.windowStart >= int64(time.Second) { + // New 1-second window. + s.relaySourceCount[senderID] = &relaySourceWindow{windowStart: now, count: 1} + s.relayRateMu.Unlock() + } else if w.count >= maxRelaysPerSourcePerSecond { + // Source exceeded per-second budget — silently drop. + // The sender's daemon retries (3-attempt path in + // pkg/daemon/daemon.go relay branch), so a drop here + // is eventually self-healing for honest senders. + s.relayRateMu.Unlock() + return + } else { + w.count++ + s.relayRateMu.Unlock() + } + // Copy payload into a pooled buffer so we don't hold the read buffer payload := data[8:] if len(payload) > maxRelayPayload { @@ -941,6 +984,16 @@ func (s *Server) reapStaleNodes() { } } s.punchRateMu.Unlock() + + // Sweep stale relay-source entries. + s.relayRateMu.Lock() + cutoffNs := time.Now().Add(-relaySourceCleanupInterval).UnixNano() + for id, w := range s.relaySourceCount { + if w.windowStart < cutoffNs { + delete(s.relaySourceCount, id) + } + } + s.relayRateMu.Unlock() } // --- Gossip ---