Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions pkg/daemon/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ type TunnelManager struct {
// bus subscriber — see Daemon.subscribeWebhookToBus.
bus *inProcessBus

// Per-source-IP rate limiter for PILA/PILK frames. Prevents CPU
// DoS via Ed25519 verify / X25519 scalar mult from rotating
// spoofed node IDs. See PILOT-265.
kxRateLimMu sync.Mutex
kxRateLim map[string]*srcKxBucket

// Metrics
BytesSent uint64
BytesRecv uint64
Expand All @@ -185,6 +191,22 @@ const maxPendingPerPeer = 64
// maxPendingPeers limits the total number of peers with pending key exchanges.
const maxPendingPeers = 256

// perSourceKxLimit is the max PILA/PILK frames per source IP per second
// accepted before Ed25519 verify / X25519 scalar mult. An attacker sending
// rotating spoofed node IDs can saturate a daemon's CPU; this rate limit
// drops excess frames at the tunnel readLoop layer before any crypto work.
// Must be ≥ 1 to allow legitimate retransmit pairs.
const perSourceKxLimit = 5

// maxPerSrcKxEntries caps the tracked source IP map to prevent unbounded
// growth from address scanning.
const maxPerSrcKxEntries = 4096

type srcKxBucket struct {
tokens int
lastFill time.Time
}

// ErrPendingDropped is returned by sendEncryptedToNode when the per-peer
// pending queue was already at maxPendingPerPeer and the oldest queued
// packet had to be dropped to make room for the new one. The CALLER's
Expand All @@ -196,6 +218,51 @@ const maxPendingPeers = 256
// after the queue drains). Surfacing it as a typed error also lets
// pilotctl render a "tunnel handshaking" hint instead of an opaque
// "send SYN: pending queue full" message.
// allowKxFromSource checks per-source-IP rate limit for PILA/PILK frames.
// An attacker can saturate CPU with Ed25519 verify / X25519 scalar mult by
// sending frames with rotating spoofed node IDs (see PILOT-265). This gate
// runs before any crypto operation in handleAuthKeyExchange / handleKeyExchange.
//
// Relay-delivered frames (fromRelay=true) bypass this check because the
// source IP is always the beacon — rate-limiting there would incorrectly
// penalise all relay traffic.
func (tm *TunnelManager) allowKxFromSource(addr *net.UDPAddr) bool {
if addr == nil {
return true
}
key := addr.IP.String()
tm.kxRateLimMu.Lock()
defer tm.kxRateLimMu.Unlock()

b, ok := tm.kxRateLim[key]
now := time.Now()
if !ok {
if len(tm.kxRateLim) >= maxPerSrcKxEntries {
return false
}
tm.kxRateLim[key] = &srcKxBucket{tokens: perSourceKxLimit - 1, lastFill: now}
return true
}

elapsed := now.Sub(b.lastFill)
if elapsed > 0 {
refill := int(elapsed.Seconds() * float64(perSourceKxLimit))
if refill > 0 {
b.tokens += refill
if b.tokens > perSourceKxLimit {
b.tokens = perSourceKxLimit
}
b.lastFill = now
}
}

if b.tokens > 0 {
b.tokens--
return true
}
return false
}

var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending")

// RecvChSize is the capacity of the incoming packet channel.
Expand All @@ -214,6 +281,7 @@ func NewTunnelManager() *TunnelManager {
recvCh: make(chan *IncomingPacket, RecvChSize),
done: make(chan struct{}),
routing: routing.New(),
kxRateLim: make(map[string]*srcKxBucket),
}
tm.routing.SetLocalNodeIDFn(tm.loadNodeID)
tm.kx = keyexchange.New(store)
Expand Down Expand Up @@ -952,6 +1020,13 @@ func (tm *TunnelManager) handleAuthKeyExchange(data []byte, from *net.UDPAddr, f
if !tm.encrypt || tm.privKey == nil {
return
}
// Per-source-IP rate limit on PILA — before Ed25519 verify + X25519
// scalar mult. Relay frames are not source-limited (IP is always
// the beacon).
if !fromRelay && !tm.allowKxFromSource(from) {
slog.Debug("auth key exchange rate-limited (source IP)", "from", from)
return
}
tm.kx.HandleAuthFrame(data, from, fromRelay)
}

Expand All @@ -970,6 +1045,12 @@ func (tm *TunnelManager) handleKeyExchange(data []byte, from *net.UDPAddr, fromR
if !tm.encrypt || tm.privKey == nil {
return
}
// Per-source-IP rate limit on PILK — before X25519 scalar mult.
// Relay frames are not source-limited (IP is always the beacon).
if !fromRelay && !tm.allowKxFromSource(from) {
slog.Debug("key exchange rate-limited (source IP)", "from", from)
return
}
tm.kx.HandleUnauthFrame(data, from, fromRelay)
}

Expand Down
119 changes: 119 additions & 0 deletions pkg/daemon/zz_pilot265_kx_rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

package daemon

import (
"net"
"sync"
"testing"
"time"
)

// TestKxRateLimiterBasics verifies that allowKxFromSource correctly accepts
// frames within the per-source-IP budget and rejects excess frames.
func TestKxRateLimiterBasics(t *testing.T) {
t.Parallel()
tm := NewTunnelManager()

addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.1"), Port: 12345}

// First perSourceKxLimit frames should be allowed.
for i := 0; i < perSourceKxLimit; i++ {
if !tm.allowKxFromSource(addr) {
t.Fatalf("frame %d should have been allowed within budget", i+1)
}
}

// Next frame should be rejected (budget exhausted).
if tm.allowKxFromSource(addr) {
t.Fatal("frame after budget exhaustion should have been rejected")
}

// Different source IP should have its own budget.
addr2 := &net.UDPAddr{IP: net.ParseIP("10.0.0.2"), Port: 54321}
if !tm.allowKxFromSource(addr2) {
t.Fatal("different source IP should have its own fresh budget")
}
}

// TestKxRateLimiterRefill verifies that the token bucket refills over time.
func TestKxRateLimiterRefill(t *testing.T) {
t.Parallel()
tm := NewTunnelManager()
addr := &net.UDPAddr{IP: net.ParseIP("10.0.0.3"), Port: 9999}

// Exhaust the budget.
for i := 0; i < perSourceKxLimit; i++ {
tm.allowKxFromSource(addr)
}
if tm.allowKxFromSource(addr) {
t.Fatal("budget should be exhausted")
}

// Wait just over 1 second to guarantee at least 1 token refill.
time.Sleep(time.Duration(1.05 * float64(time.Second)))

if !tm.allowKxFromSource(addr) {
t.Fatal("should have refilled at least 1 token")
}
}

// TestKxRateLimiterNilAddr verifies that nil addr bypasses the check
// (same convention as relay-delivered frames with unknown beacon).
func TestKxRateLimiterNilAddr(t *testing.T) {
t.Parallel()
tm := NewTunnelManager()
if !tm.allowKxFromSource(nil) {
t.Fatal("nil addr should always be allowed")
}
}

// TestKxRateLimiterConcurrent verifies the rate limiter is safe under
// concurrent access from multiple goroutines.
func TestKxRateLimiterConcurrent(t *testing.T) {
t.Parallel()
tm := NewTunnelManager()
var wg sync.WaitGroup
const concurrency = 32

// Fire concurrent requests from different IPs simultaneously.
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
ip := net.IPv4(10, 0, 0, byte(n%256))
addr := &net.UDPAddr{IP: ip, Port: 10000 + n}
for j := 0; j < perSourceKxLimit; j++ {
tm.allowKxFromSource(addr)
}
// Should be blocked now.
if tm.allowKxFromSource(addr) {
t.Errorf("goroutine %d: should be rate-limited after budget", n)
}
}(i)
}
wg.Wait()
}

// TestKxRateLimiterMaxEntries verifies the map cap is enforced.
func TestKxRateLimiterMaxEntries(t *testing.T) {
t.Parallel()
tm := NewTunnelManager()

// Fill beyond capacity — use /16 subnets to generate unique IPs.
for i := 0; i < maxPerSrcKxEntries+100; i++ {
addr := &net.UDPAddr{
IP: net.IPv4(byte(i>>8), byte(i&0xff), 0, 1),
Port: 10000,
}
tm.allowKxFromSource(addr)
}

// Verify the map didn't grow unboundedly.
tm.kxRateLimMu.Lock()
size := len(tm.kxRateLim)
tm.kxRateLimMu.Unlock()
if size > maxPerSrcKxEntries+10 {
t.Fatalf("map grew to %d entries, want ≤ %d", size, maxPerSrcKxEntries+10)
}
}
Loading