Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions api/rpc/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rpc

import (
"context"
"net"
"net/http"
"sync"
"time"

"golang.org/x/time/rate"
)

// connLimit returns middleware that limits the number of concurrent requests.
// When the limit is reached, new requests receive 503 Service Unavailable.
func connLimit(maxConns int, next http.Handler) http.Handler {
sem := make(chan struct{}, maxConns)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case sem <- struct{}{}:
defer func() { <-sem }()
next.ServeHTTP(w, r)
default:
log.Warnw("connection limit reached, rejecting request", "remote", r.RemoteAddr)
http.Error(w, "server busy, try again later", http.StatusServiceUnavailable)
}
})
}

// rateLimit returns middleware that enforces per-IP rate limiting.
// Requests exceeding the limit receive 429 Too Many Requests.
// The background cleanup goroutine exits when ctx is canceled.
func rateLimit(ctx context.Context, rps, burst int, next http.Handler) http.Handler {
var mu sync.Mutex
type entry struct {
limiter *rate.Limiter
lastSeen time.Time
}
limiters := make(map[string]*entry)
rateL := rate.Limit(rps)

// Evict stale entries in the background.
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
mu.Lock()
for ip, e := range limiters {
if time.Since(e.lastSeen) > 10*time.Minute {
delete(limiters, ip)
}
}
mu.Unlock()
}
}
}()
Comment on lines +42 to +59
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Goroutine leak: rate limiter cleanup goroutine cannot be stopped if Start() fails or is never called

The rateLimit function at api/rpc/middleware.go:42 spawns a background goroutine during NewServer (via newHandlerStack at api/rpc/server.go:122). The only way to stop this goroutine is by canceling the context, which happens in Stop() at api/rpc/server.go:214. However, Stop() guards cancelFunc() behind s.started.CompareAndSwap(true, false) — if Start() was never called or failed (e.g., port already in use at api/rpc/server.go:183-186), the started flag remains false, the CompareAndSwap fails, and the method returns early at line 212 without ever calling cancelFunc(). This leaks the background goroutine (and its ticker). In the fx lifecycle (nodebuilder/rpc/module.go:21-26), if Start fails, fx does not call Stop for that component, so the goroutine is never cleaned up.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


getLimiter := func(ip string) *rate.Limiter {
mu.Lock()
defer mu.Unlock()
e, ok := limiters[ip]
if !ok {
l := rate.NewLimiter(rateL, burst)
limiters[ip] = &entry{limiter: l, lastSeen: time.Now()}
return l
}
e.lastSeen = time.Now()
return e.limiter
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := extractIP(r)
if !getLimiter(ip).Allow() {
log.Warnw("rate limit exceeded", "ip", ip)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If a client is actively being rate-limited, this will log a warning for every rejected request. Under a real DOS attack, this creates massive log volume, which itself could be a resource exhaustion vector (disk I/O, etc).

Maybe let's set it to debug?

http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}

// extractIP returns the IP portion of RemoteAddr (strips port).
func extractIP(r *http.Request) string {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realised one more thing:
extractIP uses RemoteAddr, but since nodes can run behind a reverse proxy(confirmed by @sysrex), all clients will share a single rate limit bucket (100 req/s total). This means one abusive client can exhaust the limit and cause 429s for all legitimate users

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you don't really need any of the rate limiting if you delegate it to reverse proxy

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the rate limiting be opt-in (disabled by default, enable if no proxy) rather than always-on?

host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return host
}
140 changes: 140 additions & 0 deletions api/rpc/middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package rpc

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const testRemoteAddr = "1.2.3.4:1234"

func TestConnLimit_AllowsWithinLimit(t *testing.T) {
handler := connLimit(2, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
}

func TestConnLimit_RejectsOverLimit(t *testing.T) {
blocked := make(chan struct{})
released := make(chan struct{})
handler := connLimit(1, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("X-Block") == "true" {
close(blocked)
<-released
}
w.WriteHeader(http.StatusOK)
}))

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
req := httptest.NewRequest("GET", "/", nil)
req.Header.Set("X-Block", "true")
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
}()

<-blocked

req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)

close(released)
wg.Wait()
}

func TestConnLimit_ReleasesSlotAfterRequest(t *testing.T) {
handler := connLimit(1, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)

w = httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
}

func TestRateLimit_AllowsWithinLimit(t *testing.T) {
handler := rateLimit(context.Background(), 100, 10, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

req := httptest.NewRequest("GET", "/", nil)
req.RemoteAddr = testRemoteAddr
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
}

func TestRateLimit_RejectsOverBurst(t *testing.T) {
handler := rateLimit(context.Background(), 1, 3, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

req := httptest.NewRequest("GET", "/", nil)
req.RemoteAddr = testRemoteAddr

for i := 0; i < 3; i++ {
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code, "request %d should succeed", i)
}

w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusTooManyRequests, w.Code)
}

func TestRateLimit_DifferentIPsIndependent(t *testing.T) {
handler := rateLimit(context.Background(), 1, 1, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

req1 := httptest.NewRequest("GET", "/", nil)
req1.RemoteAddr = testRemoteAddr
w := httptest.NewRecorder()
handler.ServeHTTP(w, req1)
assert.Equal(t, http.StatusOK, w.Code)

w = httptest.NewRecorder()
handler.ServeHTTP(w, req1)
assert.Equal(t, http.StatusTooManyRequests, w.Code)

req2 := httptest.NewRequest("GET", "/", nil)
req2.RemoteAddr = "5.6.7.8:5678"
w = httptest.NewRecorder()
handler.ServeHTTP(w, req2)
assert.Equal(t, http.StatusOK, w.Code)
}

func TestExtractIP(t *testing.T) {
tests := []struct {
remoteAddr string
expected string
}{
{testRemoteAddr, "1.2.3.4"},
{"[::1]:1234", "::1"},
{"1.2.3.4", "1.2.3.4"},
}
for _, tt := range tests {
r := &http.Request{RemoteAddr: tt.remoteAddr}
assert.Equal(t, tt.expected, extractIP(r))
}
}
51 changes: 40 additions & 11 deletions api/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ import (

var log = logging.Logger("rpc")

// TODO(@Wondertan): Expose in config if requested
const (
// maxRequestSize is 5 MiB, significantly lower than go-jsonrpc's 100 MiB default.
maxRequestSize = 5 << 20
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a 5 MiB limit would reject any blob.Submit call with a blob larger than ~3.75 MiB (after base64 + JSON envelope overhead), while the protocol allows blobs up to 32 MiB.

This is actually a breaking regression for the blob submission path.

// maxConcurrentConns caps simultaneous connections to bound goroutine/FD usage.
maxConcurrentConns = 500
// maxRequestsPerSecond is the per-IP sustained rate.
maxRequestsPerSecond = 100
// maxRequestBurst is the per-IP burst allowance.
maxRequestBurst = 200
)

type CORSConfig struct {
Enabled bool
AllowedOrigins []string
Expand All @@ -35,6 +47,7 @@ type Server struct {
authDisabled bool

started atomic.Bool
cancelFunc context.CancelFunc
corsConfig CORSConfig

tlsEnabled bool
Expand All @@ -59,12 +72,17 @@ func NewServer(
signer jwt.Signer,
verifier jwt.Verifier,
) *Server {
rpc := jsonrpc.NewServer()
rpc := jsonrpc.NewServer(
jsonrpc.WithMaxRequestSize(maxRequestSize),
)

ctx, cancel := context.WithCancel(context.Background())
srv := &Server{
rpc: rpc,
signer: signer,
verifier: verifier,
authDisabled: authDisabled,
cancelFunc: cancel,
corsConfig: corsConfig,
tlsEnabled: tlsConfig.Enabled,
tlsCertPath: tlsConfig.CertPath,
Expand All @@ -73,26 +91,36 @@ func NewServer(

srv.srv = &http.Server{
Addr: net.JoinHostPort(address, port),
Handler: srv.newHandlerStack(rpc),
Handler: srv.newHandlerStack(ctx, rpc),
// the amount of time allowed to read request headers. set to the default 2 seconds
ReadHeaderTimeout: 2 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20, // 1 MiB
}

return srv
}

// newHandlerStack returns wrapped rpc related handlers
func (s *Server) newHandlerStack(core http.Handler) http.Handler {
if s.authDisabled {
// newHandlerStack returns wrapped rpc related handlers.
// Middleware order (outermost first): rate-limit → conn-limit → CORS/auth → RPC handler.
func (s *Server) newHandlerStack(ctx context.Context, core http.Handler) http.Handler {
var h http.Handler
switch {
case s.authDisabled:
log.Warn("auth disabled, allowing all origins, methods and headers for CORS")
return s.corsAny(core)
}

if s.corsConfig.Enabled {
return s.corsWithConfig(s.authHandler(core))
h = s.corsAny(core)
case s.corsConfig.Enabled:
h = s.corsWithConfig(s.authHandler(core))
default:
h = s.authHandler(core)
}

return s.authHandler(core)
// Apply connection and rate limiting as outermost layers.
h = connLimit(maxConcurrentConns, h)
h = rateLimit(ctx, maxRequestsPerSecond, maxRequestBurst, h)
return h
}

// verifyAuth is the RPC server's auth middleware. This middleware is only
Expand Down Expand Up @@ -183,6 +211,7 @@ func (s *Server) Stop(ctx context.Context) error {
log.Warn("cannot stop server: already stopped")
return nil
}
s.cancelFunc()
err := s.srv.Shutdown(ctx)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions api/rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestServer_HandlerStackSelection(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler := server.newHandlerStack(testHandler)
handler := server.newHandlerStack(context.Background(), testHandler)

// Test OPTIONS request (CORS preflight)
req := httptest.NewRequest("OPTIONS", "/", nil)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestServer_AuthDisabledOverridesCORS(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler := server.newHandlerStack(testHandler)
handler := server.newHandlerStack(context.Background(), testHandler)

// Test with different origin than configured
req := httptest.NewRequest("OPTIONS", "/", nil)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestServer_CORSConfigurationPassing(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler := server.newHandlerStack(testHandler)
handler := server.newHandlerStack(context.Background(), testHandler)

req := httptest.NewRequest("OPTIONS", "/", nil)
req.Header.Set("Origin", tt.testOrigin)
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestServer_AuthMiddleware(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

handler := server.newHandlerStack(testHandler)
handler := server.newHandlerStack(context.Background(), testHandler)

req := httptest.NewRequest("GET", "/", nil)
if tt.token != "" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
golang.org/x/crypto v0.49.0
golang.org/x/sync v0.20.0
golang.org/x/text v0.35.0
golang.org/x/time v0.15.0
google.golang.org/grpc v1.79.3
google.golang.org/protobuf v1.36.11
)
Expand Down Expand Up @@ -375,7 +376,6 @@ require (
golang.org/x/sys v0.42.0 // indirect
golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect
golang.org/x/term v0.41.0 // indirect
golang.org/x/time v0.15.0 // indirect
golang.org/x/tools v0.43.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
gonum.org/v1/gonum v0.17.0 // indirect
Expand Down
Loading