diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index 34b99976..3607f3f3 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -159,6 +159,12 @@ type TunnelManager struct { // bus subscriber — see Daemon.subscribeWebhookToBus. bus *inProcessBus + // Per-source-IP rate limiter for PILA/PILK frames. Prevents CPU + // DoS via Ed25519 verify / X25519 scalar mult from rotating + // spoofed node IDs. See PILOT-265. + kxRateLimMu sync.Mutex + kxRateLim map[string]*srcKxBucket + // Metrics BytesSent uint64 BytesRecv uint64 @@ -185,6 +191,22 @@ const maxPendingPerPeer = 64 // maxPendingPeers limits the total number of peers with pending key exchanges. const maxPendingPeers = 256 +// perSourceKxLimit is the max PILA/PILK frames per source IP per second +// accepted before Ed25519 verify / X25519 scalar mult. An attacker sending +// rotating spoofed node IDs can saturate a daemon's CPU; this rate limit +// drops excess frames at the tunnel readLoop layer before any crypto work. +// Must be ≥ 1 to allow legitimate retransmit pairs. +const perSourceKxLimit = 5 + +// maxPerSrcKxEntries caps the tracked source IP map to prevent unbounded +// growth from address scanning. +const maxPerSrcKxEntries = 4096 + +type srcKxBucket struct { + tokens int + lastFill time.Time +} + // ErrPendingDropped is returned by sendEncryptedToNode when the per-peer // pending queue was already at maxPendingPerPeer and the oldest queued // packet had to be dropped to make room for the new one. The CALLER's @@ -196,6 +218,51 @@ const maxPendingPeers = 256 // after the queue drains). Surfacing it as a typed error also lets // pilotctl render a "tunnel handshaking" hint instead of an opaque // "send SYN: pending queue full" message. +// allowKxFromSource checks per-source-IP rate limit for PILA/PILK frames. +// An attacker can saturate CPU with Ed25519 verify / X25519 scalar mult by +// sending frames with rotating spoofed node IDs (see PILOT-265). This gate +// runs before any crypto operation in handleAuthKeyExchange / handleKeyExchange. +// +// Relay-delivered frames (fromRelay=true) bypass this check because the +// source IP is always the beacon — rate-limiting there would incorrectly +// penalise all relay traffic. +func (tm *TunnelManager) allowKxFromSource(addr *net.UDPAddr) bool { + if addr == nil { + return true + } + key := addr.IP.String() + tm.kxRateLimMu.Lock() + defer tm.kxRateLimMu.Unlock() + + b, ok := tm.kxRateLim[key] + now := time.Now() + if !ok { + if len(tm.kxRateLim) >= maxPerSrcKxEntries { + return false + } + tm.kxRateLim[key] = &srcKxBucket{tokens: perSourceKxLimit - 1, lastFill: now} + return true + } + + elapsed := now.Sub(b.lastFill) + if elapsed > 0 { + refill := int(elapsed.Seconds() * float64(perSourceKxLimit)) + if refill > 0 { + b.tokens += refill + if b.tokens > perSourceKxLimit { + b.tokens = perSourceKxLimit + } + b.lastFill = now + } + } + + if b.tokens > 0 { + b.tokens-- + return true + } + return false +} + var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending") // RecvChSize is the capacity of the incoming packet channel. @@ -214,6 +281,7 @@ func NewTunnelManager() *TunnelManager { recvCh: make(chan *IncomingPacket, RecvChSize), done: make(chan struct{}), routing: routing.New(), + kxRateLim: make(map[string]*srcKxBucket), } tm.routing.SetLocalNodeIDFn(tm.loadNodeID) tm.kx = keyexchange.New(store) @@ -952,6 +1020,13 @@ func (tm *TunnelManager) handleAuthKeyExchange(data []byte, from *net.UDPAddr, f if !tm.encrypt || tm.privKey == nil { return } + // Per-source-IP rate limit on PILA — before Ed25519 verify + X25519 + // scalar mult. Relay frames are not source-limited (IP is always + // the beacon). + if !fromRelay && !tm.allowKxFromSource(from) { + slog.Debug("auth key exchange rate-limited (source IP)", "from", from) + return + } tm.kx.HandleAuthFrame(data, from, fromRelay) } @@ -970,6 +1045,12 @@ func (tm *TunnelManager) handleKeyExchange(data []byte, from *net.UDPAddr, fromR if !tm.encrypt || tm.privKey == nil { return } + // Per-source-IP rate limit on PILK — before X25519 scalar mult. + // Relay frames are not source-limited (IP is always the beacon). + if !fromRelay && !tm.allowKxFromSource(from) { + slog.Debug("key exchange rate-limited (source IP)", "from", from) + return + } tm.kx.HandleUnauthFrame(data, from, fromRelay) } diff --git a/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go b/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go new file mode 100644 index 00000000..5b9eb728 --- /dev/null +++ b/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package daemon + +import ( + "net" + "sync" + "testing" + "time" +) + +// TestKxRateLimiterBasics verifies that allowKxFromSource correctly accepts +// frames within the per-source-IP budget and rejects excess frames. +func TestKxRateLimiterBasics(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + + addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.1"), Port: 12345} + + // First perSourceKxLimit frames should be allowed. + for i := 0; i < perSourceKxLimit; i++ { + if !tm.allowKxFromSource(addr) { + t.Fatalf("frame %d should have been allowed within budget", i+1) + } + } + + // Next frame should be rejected (budget exhausted). + if tm.allowKxFromSource(addr) { + t.Fatal("frame after budget exhaustion should have been rejected") + } + + // Different source IP should have its own budget. + addr2 := &net.UDPAddr{IP: net.ParseIP("10.0.0.2"), Port: 54321} + if !tm.allowKxFromSource(addr2) { + t.Fatal("different source IP should have its own fresh budget") + } +} + +// TestKxRateLimiterRefill verifies that the token bucket refills over time. +func TestKxRateLimiterRefill(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.3"), Port: 9999} + + // Exhaust the budget. + for i := 0; i < perSourceKxLimit; i++ { + tm.allowKxFromSource(addr) + } + if tm.allowKxFromSource(addr) { + t.Fatal("budget should be exhausted") + } + + // Wait just over 1 second to guarantee at least 1 token refill. + time.Sleep(time.Duration(1.05 * float64(time.Second))) + + if !tm.allowKxFromSource(addr) { + t.Fatal("should have refilled at least 1 token") + } +} + +// TestKxRateLimiterNilAddr verifies that nil addr bypasses the check +// (same convention as relay-delivered frames with unknown beacon). +func TestKxRateLimiterNilAddr(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + if !tm.allowKxFromSource(nil) { + t.Fatal("nil addr should always be allowed") + } +} + +// TestKxRateLimiterConcurrent verifies the rate limiter is safe under +// concurrent access from multiple goroutines. +func TestKxRateLimiterConcurrent(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + var wg sync.WaitGroup + const concurrency = 32 + + // Fire concurrent requests from different IPs simultaneously. + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + ip := net.IPv4(10, 0, 0, byte(n%256)) + addr := &net.UDPAddr{IP: ip, Port: 10000 + n} + for j := 0; j < perSourceKxLimit; j++ { + tm.allowKxFromSource(addr) + } + // Should be blocked now. + if tm.allowKxFromSource(addr) { + t.Errorf("goroutine %d: should be rate-limited after budget", n) + } + }(i) + } + wg.Wait() +} + +// TestKxRateLimiterMaxEntries verifies the map cap is enforced. +func TestKxRateLimiterMaxEntries(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + + // Fill beyond capacity — use /16 subnets to generate unique IPs. + for i := 0; i < maxPerSrcKxEntries+100; i++ { + addr := &net.UDPAddr{ + IP: net.IPv4(byte(i>>8), byte(i&0xff), 0, 1), + Port: 10000, + } + tm.allowKxFromSource(addr) + } + + // Verify the map didn't grow unboundedly. + tm.kxRateLimMu.Lock() + size := len(tm.kxRateLim) + tm.kxRateLimMu.Unlock() + if size > maxPerSrcKxEntries+10 { + t.Fatalf("map grew to %d entries, want ≤ %d", size, maxPerSrcKxEntries+10) + } +}