From 8b342f447e1c15a7045f09c120a107a1ec36521f Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 00:43:52 +0000 Subject: [PATCH] fix(keyexchange): per-source-IP rate limit on PILA/PILK frames before crypto (PILOT-265) Add a per-source-IP token-bucket rate limiter at the tunnel readLoop layer that drops excess authentication/key-exchange frames before they reach expensive Ed25519 signature verification and X25519 scalar multiplication. An attacker with rotating spoofed node IDs (that pass the registry pubkey lookup) could otherwise sustain CPU saturation at ~1 PILA verify/sec/peer-id by sending frames with DuplicateHandshakeDebounce timing. The rate limiter follows the existing allowSYNFromSource pattern: - 5 frames/second/source-IP budget (generous for legitimate retransmit pairs) - 4096 max tracked entries (prevents map-growth DoS) - Relay-delivered frames (fromRelay=true) bypass the check since the source IP is always the beacon Closes PILOT-265 --- pkg/daemon/tunnel.go | 81 ++++++++++++ .../zz_pilot265_kx_rate_limiter_test.go | 119 ++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 pkg/daemon/zz_pilot265_kx_rate_limiter_test.go diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index 34b99976..3607f3f3 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -159,6 +159,12 @@ type TunnelManager struct { // bus subscriber — see Daemon.subscribeWebhookToBus. bus *inProcessBus + // Per-source-IP rate limiter for PILA/PILK frames. Prevents CPU + // DoS via Ed25519 verify / X25519 scalar mult from rotating + // spoofed node IDs. See PILOT-265. + kxRateLimMu sync.Mutex + kxRateLim map[string]*srcKxBucket + // Metrics BytesSent uint64 BytesRecv uint64 @@ -185,6 +191,22 @@ const maxPendingPerPeer = 64 // maxPendingPeers limits the total number of peers with pending key exchanges. const maxPendingPeers = 256 +// perSourceKxLimit is the max PILA/PILK frames per source IP per second +// accepted before Ed25519 verify / X25519 scalar mult. An attacker sending +// rotating spoofed node IDs can saturate a daemon's CPU; this rate limit +// drops excess frames at the tunnel readLoop layer before any crypto work. +// Must be ≥ 1 to allow legitimate retransmit pairs. +const perSourceKxLimit = 5 + +// maxPerSrcKxEntries caps the tracked source IP map to prevent unbounded +// growth from address scanning. +const maxPerSrcKxEntries = 4096 + +type srcKxBucket struct { + tokens int + lastFill time.Time +} + // ErrPendingDropped is returned by sendEncryptedToNode when the per-peer // pending queue was already at maxPendingPerPeer and the oldest queued // packet had to be dropped to make room for the new one. The CALLER's @@ -196,6 +218,51 @@ const maxPendingPeers = 256 // after the queue drains). Surfacing it as a typed error also lets // pilotctl render a "tunnel handshaking" hint instead of an opaque // "send SYN: pending queue full" message. +// allowKxFromSource checks per-source-IP rate limit for PILA/PILK frames. +// An attacker can saturate CPU with Ed25519 verify / X25519 scalar mult by +// sending frames with rotating spoofed node IDs (see PILOT-265). This gate +// runs before any crypto operation in handleAuthKeyExchange / handleKeyExchange. +// +// Relay-delivered frames (fromRelay=true) bypass this check because the +// source IP is always the beacon — rate-limiting there would incorrectly +// penalise all relay traffic. +func (tm *TunnelManager) allowKxFromSource(addr *net.UDPAddr) bool { + if addr == nil { + return true + } + key := addr.IP.String() + tm.kxRateLimMu.Lock() + defer tm.kxRateLimMu.Unlock() + + b, ok := tm.kxRateLim[key] + now := time.Now() + if !ok { + if len(tm.kxRateLim) >= maxPerSrcKxEntries { + return false + } + tm.kxRateLim[key] = &srcKxBucket{tokens: perSourceKxLimit - 1, lastFill: now} + return true + } + + elapsed := now.Sub(b.lastFill) + if elapsed > 0 { + refill := int(elapsed.Seconds() * float64(perSourceKxLimit)) + if refill > 0 { + b.tokens += refill + if b.tokens > perSourceKxLimit { + b.tokens = perSourceKxLimit + } + b.lastFill = now + } + } + + if b.tokens > 0 { + b.tokens-- + return true + } + return false +} + var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending") // RecvChSize is the capacity of the incoming packet channel. @@ -214,6 +281,7 @@ func NewTunnelManager() *TunnelManager { recvCh: make(chan *IncomingPacket, RecvChSize), done: make(chan struct{}), routing: routing.New(), + kxRateLim: make(map[string]*srcKxBucket), } tm.routing.SetLocalNodeIDFn(tm.loadNodeID) tm.kx = keyexchange.New(store) @@ -952,6 +1020,13 @@ func (tm *TunnelManager) handleAuthKeyExchange(data []byte, from *net.UDPAddr, f if !tm.encrypt || tm.privKey == nil { return } + // Per-source-IP rate limit on PILA — before Ed25519 verify + X25519 + // scalar mult. Relay frames are not source-limited (IP is always + // the beacon). + if !fromRelay && !tm.allowKxFromSource(from) { + slog.Debug("auth key exchange rate-limited (source IP)", "from", from) + return + } tm.kx.HandleAuthFrame(data, from, fromRelay) } @@ -970,6 +1045,12 @@ func (tm *TunnelManager) handleKeyExchange(data []byte, from *net.UDPAddr, fromR if !tm.encrypt || tm.privKey == nil { return } + // Per-source-IP rate limit on PILK — before X25519 scalar mult. + // Relay frames are not source-limited (IP is always the beacon). + if !fromRelay && !tm.allowKxFromSource(from) { + slog.Debug("key exchange rate-limited (source IP)", "from", from) + return + } tm.kx.HandleUnauthFrame(data, from, fromRelay) } diff --git a/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go b/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go new file mode 100644 index 00000000..5b9eb728 --- /dev/null +++ b/pkg/daemon/zz_pilot265_kx_rate_limiter_test.go @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package daemon + +import ( + "net" + "sync" + "testing" + "time" +) + +// TestKxRateLimiterBasics verifies that allowKxFromSource correctly accepts +// frames within the per-source-IP budget and rejects excess frames. +func TestKxRateLimiterBasics(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + + addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.1"), Port: 12345} + + // First perSourceKxLimit frames should be allowed. + for i := 0; i < perSourceKxLimit; i++ { + if !tm.allowKxFromSource(addr) { + t.Fatalf("frame %d should have been allowed within budget", i+1) + } + } + + // Next frame should be rejected (budget exhausted). + if tm.allowKxFromSource(addr) { + t.Fatal("frame after budget exhaustion should have been rejected") + } + + // Different source IP should have its own budget. + addr2 := &net.UDPAddr{IP: net.ParseIP("10.0.0.2"), Port: 54321} + if !tm.allowKxFromSource(addr2) { + t.Fatal("different source IP should have its own fresh budget") + } +} + +// TestKxRateLimiterRefill verifies that the token bucket refills over time. +func TestKxRateLimiterRefill(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.3"), Port: 9999} + + // Exhaust the budget. + for i := 0; i < perSourceKxLimit; i++ { + tm.allowKxFromSource(addr) + } + if tm.allowKxFromSource(addr) { + t.Fatal("budget should be exhausted") + } + + // Wait just over 1 second to guarantee at least 1 token refill. + time.Sleep(time.Duration(1.05 * float64(time.Second))) + + if !tm.allowKxFromSource(addr) { + t.Fatal("should have refilled at least 1 token") + } +} + +// TestKxRateLimiterNilAddr verifies that nil addr bypasses the check +// (same convention as relay-delivered frames with unknown beacon). +func TestKxRateLimiterNilAddr(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + if !tm.allowKxFromSource(nil) { + t.Fatal("nil addr should always be allowed") + } +} + +// TestKxRateLimiterConcurrent verifies the rate limiter is safe under +// concurrent access from multiple goroutines. +func TestKxRateLimiterConcurrent(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + var wg sync.WaitGroup + const concurrency = 32 + + // Fire concurrent requests from different IPs simultaneously. + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + ip := net.IPv4(10, 0, 0, byte(n%256)) + addr := &net.UDPAddr{IP: ip, Port: 10000 + n} + for j := 0; j < perSourceKxLimit; j++ { + tm.allowKxFromSource(addr) + } + // Should be blocked now. + if tm.allowKxFromSource(addr) { + t.Errorf("goroutine %d: should be rate-limited after budget", n) + } + }(i) + } + wg.Wait() +} + +// TestKxRateLimiterMaxEntries verifies the map cap is enforced. +func TestKxRateLimiterMaxEntries(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + + // Fill beyond capacity — use /16 subnets to generate unique IPs. + for i := 0; i < maxPerSrcKxEntries+100; i++ { + addr := &net.UDPAddr{ + IP: net.IPv4(byte(i>>8), byte(i&0xff), 0, 1), + Port: 10000, + } + tm.allowKxFromSource(addr) + } + + // Verify the map didn't grow unboundedly. + tm.kxRateLimMu.Lock() + size := len(tm.kxRateLim) + tm.kxRateLimMu.Unlock() + if size > maxPerSrcKxEntries+10 { + t.Fatalf("map grew to %d entries, want ≤ %d", size, maxPerSrcKxEntries+10) + } +}