Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions erpc/networks_multiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) {
}).
Persist().
Reply(200).
Delay(50 * time.Millisecond).
JSON(map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
Expand All @@ -308,17 +309,20 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) {
requestBody := []byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBalance","params":["0x1234567890123456789012345678901234567890","0x10"]}`)

var wg sync.WaitGroup
start := make(chan struct{})
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
<-start
req := common.NewNormalizedRequest(requestBody)
resp, err := network.Forward(ctx, req)
require.NoError(t, err)
require.NotNil(t, resp)
resp.Release()
}()
}
close(start)
wg.Wait()

assert.Equal(t, int32(1), upstreamRequestCount.Load(), "initial burst should multiplex to a single upstream call")
Expand Down
170 changes: 132 additions & 38 deletions upstream/ratelimiter_budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"runtime/debug"
"sync"
"sync/atomic"
"time"
Expand All @@ -19,6 +20,22 @@ import (
"go.opentelemetry.io/otel/trace"
)

type doLimitResult struct {
statuses []*pb.RateLimitResponse_DescriptorStatus
panicErr error
}

type rateLimitEvalContext struct {
methodPattern string
scope string
projectId string
networkLabel string
userLabel string
agentName string
method string
observe func(string)
}

type RateLimiterBudget struct {
logger *zerolog.Logger
Id string
Expand Down Expand Up @@ -176,7 +193,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri

// Single rule: evaluate directly without goroutine overhead
if len(rules) == 1 {
allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, networkLabel)
allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, agentName, networkLabel)
if !allowed {
telemetry.CounterHandle(
telemetry.MetricRateLimitsTotal,
Expand All @@ -198,7 +215,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri
resultCh <- ruleResult{rule: r, allowed: true}
return
}
allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, networkLabel)
allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, agentName, networkLabel)
if !allowed {
blocked.Store(true)
}
Expand Down Expand Up @@ -228,7 +245,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri

// evaluateRule checks a single rate limit rule against the cache.
// Returns true if allowed, false if over limit.
func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, networkLabel string) bool {
func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, agentName, networkLabel string) bool {
evalStartedAt := time.Now()
scope := rule.Config.ScopeString()
methodPattern := normalizeRateLimitMethodLabel(rule.Config.Method)
Expand All @@ -241,6 +258,16 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string,
outcome,
).Observe(time.Since(evalStartedAt).Seconds())
}
evalCtx := rateLimitEvalContext{
methodPattern: methodPattern,
scope: scope,
projectId: projectId,
networkLabel: networkLabel,
userLabel: userLabel,
agentName: agentName,
method: method,
observe: observeEvaluation,
}

cache := b.getCache()
if cache == nil {
Expand Down Expand Up @@ -285,28 +312,34 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string,

Comment thread
0x666c6f marked this conversation as resolved.
var statuses []*pb.RateLimitResponse_DescriptorStatus
var timedOut bool
var waitDuration time.Duration
var panicErr error
waitStartedAt := time.Now()
if b.maxTimeout > 0 {
statuses, timedOut, waitDuration = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel)
statuses, timedOut, panicErr = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel)
} else {
waitStartedAt := time.Now()
statuses = cache.DoLimit(ctx, rlReq, limits)
waitDuration = time.Since(waitStartedAt)
statuses, panicErr = b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel)
}
waitDuration := time.Since(waitStartedAt)

if timedOut {
telemetry.ObserverHandle(
telemetry.MetricRateLimiterPermitWaitDuration,
b.Id,
methodPattern,
scope,
return b.recordFailOpen(
doSpan,
waitDuration,
evalCtx,
"timeout_fail_open",
).Observe(waitDuration.Seconds())
observeEvaluation("timeout_fail_open")
telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen)
doSpan.SetAttributes(attribute.String("result", "timeout_fail_open"))
doSpan.End()
return true // fail-open
"limit_timeout",
nil,
)
}
if panicErr != nil {
Comment thread
0x666c6f marked this conversation as resolved.
return b.recordFailOpen(
doSpan,
waitDuration,
evalCtx,
"panic_fail_open",
"limit_panic",
panicErr,
)
}

outcome := "ok"
Expand All @@ -330,6 +363,72 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string,
return !isOverLimit
}
Comment thread
0x666c6f marked this conversation as resolved.

func (b *RateLimiterBudget) recordFailOpen(
doSpan trace.Span,
waitDuration time.Duration,
evalCtx rateLimitEvalContext,
outcome, reason string,
panicErr error,
) bool {
telemetry.ObserverHandle(
telemetry.MetricRateLimiterPermitWaitDuration,
b.Id,
evalCtx.methodPattern,
evalCtx.scope,
outcome,
).Observe(waitDuration.Seconds())
evalCtx.observe(outcome)
telemetry.MetricRateLimiterFailopenTotal.WithLabelValues(
evalCtx.projectId,
evalCtx.networkLabel,
evalCtx.userLabel,
evalCtx.agentName,
b.Id,
evalCtx.method,
reason,
).Inc()
telemetry.IncNetworkAttemptReason(evalCtx.projectId, evalCtx.networkLabel, evalCtx.method, telemetry.AttemptReasonFailOpen)
if panicErr != nil {
doSpan.RecordError(panicErr)
}
doSpan.SetAttributes(attribute.String("result", outcome))
doSpan.End()
return true
}

func (b *RateLimiterBudget) doLimitSafely(
ctx context.Context,
cache limiter.RateLimitCache,
rlReq *pb.RateLimitRequest,
limits []*config.RateLimit,
method, userLabel, networkLabel string,
) (statuses []*pb.RateLimitResponse_DescriptorStatus, panicErr error) {
defer func() {
if rec := recover(); rec != nil {
panicErr = fmt.Errorf("panic during rate limiter DoLimit: %v", rec)
telemetry.MetricUnexpectedPanicTotal.WithLabelValues(
"ratelimiter-do-limit",
fmt.Sprintf("budget:%s", b.Id),
common.ErrorFingerprint(rec),
).Inc()
b.logger.Error().
Str("budget", b.Id).
Str("method", method).
Str("user", userLabel).
Str("network", networkLabel).
Interface("panic", rec).
Str("stack", string(debug.Stack())).
Msg("panic recovered during rate limiter DoLimit (failing open)")

if b.registry != nil {
b.registry.onCacheFailure(cache, panicErr)
}
}
}()

return cache.DoLimit(ctx, rlReq, limits), nil
}

// statsKeySuffix returns the pre-computed suffix for stats key.
func (r *RateLimitRule) statsKeySuffix() string {
suffix := ""
Expand All @@ -345,31 +444,33 @@ func (r *RateLimitRule) statsKeySuffix() string {
return suffix
}

// doLimitWithTimeout executes DoLimit with a timeout.
// Returns (statuses, timedOut). On timeout, returns (nil, true) and records fail-open metric.
// doLimitWithTimeout executes doLimitSafely with a timeout.
// Returns (statuses, timedOut, panicErr).
// On timeout, returns (nil, true, nil).
// On panic from DoLimit, returns (nil, false, err).
func (b *RateLimiterBudget) doLimitWithTimeout(
ctx context.Context,
cache limiter.RateLimitCache,
rlReq *pb.RateLimitRequest,
limits []*config.RateLimit,
method, userLabel, networkLabel string,
) ([]*pb.RateLimitResponse_DescriptorStatus, bool, time.Duration) {
waitStartedAt := time.Now()
resultCh := make(chan []*pb.RateLimitResponse_DescriptorStatus, 1)
) ([]*pb.RateLimitResponse_DescriptorStatus, bool, error) {
resultCh := make(chan doLimitResult, 1)
go func() {
resultCh <- cache.DoLimit(ctx, rlReq, limits)
statuses, panicErr := b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel)
resultCh <- doLimitResult{statuses: statuses, panicErr: panicErr}
}()

timer := time.NewTimer(b.maxTimeout)
select {
case statuses := <-resultCh:
case result := <-resultCh:
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
return statuses, false, time.Since(waitStartedAt)
return result.statuses, false, result.panicErr

case <-timer.C:
Comment thread
0x666c6f marked this conversation as resolved.
b.logger.Warn().
Expand All @@ -378,16 +479,9 @@ func (b *RateLimiterBudget) doLimitWithTimeout(
Dur("timeout", b.maxTimeout).
Msg("rate limiter timeout exceeded, failing open")

telemetry.MetricRateLimiterFailopenTotal.WithLabelValues(
"", // projectId not available here
networkLabel,
userLabel,
"", // agentName not available here
b.Id,
method,
"limit_timeout",
).Inc()

return nil, true, time.Since(waitStartedAt)
// The detached DoLimit goroutine may still finish or panic after this return.
// That late panic is still counted via MetricUnexpectedPanicTotal and onCacheFailure,
// but this caller has already recorded timeout_fail_open for the permit attempt.
return nil, true, nil
}
}
Loading
Loading