From c87462db61556ee3584d24df15a2227b8563c612 Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 13:18:22 +0200 Subject: [PATCH 1/6] fix(ratelimiter): fail open on redis panics --- upstream/ratelimiter_budget.go | 81 ++++++++++++++-- upstream/ratelimiter_budget_metrics_test.go | 101 +++++++++++++++----- upstream/ratelimiter_registry.go | 14 ++- 3 files changed, 164 insertions(+), 32 deletions(-) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 2640f1c51..b40c9a4e0 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "runtime/debug" "sync" "sync/atomic" "time" @@ -19,6 +20,11 @@ import ( "go.opentelemetry.io/otel/trace" ) +type doLimitResult struct { + statuses []*pb.RateLimitResponse_DescriptorStatus + panicErr error +} + type RateLimiterBudget struct { logger *zerolog.Logger Id string @@ -285,12 +291,13 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, var statuses []*pb.RateLimitResponse_DescriptorStatus var timedOut bool + var panicErr error var waitDuration time.Duration if b.maxTimeout > 0 { - statuses, timedOut, waitDuration = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) + statuses, timedOut, panicErr, waitDuration = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) } else { waitStartedAt := time.Now() - statuses = cache.DoLimit(ctx, rlReq, limits) + statuses, panicErr = b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) waitDuration = time.Since(waitStartedAt) } @@ -308,6 +315,30 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, doSpan.End() return true // fail-open } + if panicErr != nil { + telemetry.ObserverHandle( + telemetry.MetricRateLimiterPermitWaitDuration, + b.Id, + methodPattern, + scope, + "panic_fail_open", + ).Observe(waitDuration.Seconds()) + observeEvaluation("panic_fail_open") + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + "", // projectId not available here + networkLabel, + userLabel, + "", // agentName not available here + b.Id, + method, + "limit_panic", + ).Inc() + telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) + doSpan.RecordError(panicErr) + doSpan.SetAttributes(attribute.String("result", "panic_fail_open")) + doSpan.End() + return true // fail-open + } outcome := "ok" isOverLimit := len(statuses) > 0 && statuses[0].Code == pb.RateLimitResponse_OVER_LIMIT @@ -330,6 +361,39 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, return !isOverLimit } +func (b *RateLimiterBudget) doLimitSafely( + ctx context.Context, + cache limiter.RateLimitCache, + rlReq *pb.RateLimitRequest, + limits []*config.RateLimit, + method, userLabel, networkLabel string, +) (statuses []*pb.RateLimitResponse_DescriptorStatus, panicErr error) { + defer func() { + if rec := recover(); rec != nil { + panicErr = fmt.Errorf("panic during rate limiter DoLimit: %v", rec) + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + fmt.Sprintf("budget:%s", b.Id), + common.ErrorFingerprint(rec), + ).Inc() + b.logger.Error(). + Str("budget", b.Id). + Str("method", method). + Str("user", userLabel). + Str("network", networkLabel). + Interface("panic", rec). + Str("stack", string(debug.Stack())). + Msg("panic recovered during rate limiter DoLimit (failing open)") + + if b.registry != nil && b.registry.cfg != nil && b.registry.cfg.Store != nil && b.registry.cfg.Store.Driver == "redis" { + b.registry.onRedisCacheFailure(panicErr) + } + } + }() + + return cache.DoLimit(ctx, rlReq, limits), nil +} + // statsKeySuffix returns the pre-computed suffix for stats key. func (r *RateLimitRule) statsKeySuffix() string { suffix := "" @@ -353,23 +417,24 @@ func (b *RateLimiterBudget) doLimitWithTimeout( rlReq *pb.RateLimitRequest, limits []*config.RateLimit, method, userLabel, networkLabel string, -) ([]*pb.RateLimitResponse_DescriptorStatus, bool, time.Duration) { +) ([]*pb.RateLimitResponse_DescriptorStatus, bool, error, time.Duration) { waitStartedAt := time.Now() - resultCh := make(chan []*pb.RateLimitResponse_DescriptorStatus, 1) + resultCh := make(chan doLimitResult, 1) go func() { - resultCh <- cache.DoLimit(ctx, rlReq, limits) + statuses, panicErr := b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) + resultCh <- doLimitResult{statuses: statuses, panicErr: panicErr} }() timer := time.NewTimer(b.maxTimeout) select { - case statuses := <-resultCh: + case result := <-resultCh: if !timer.Stop() { select { case <-timer.C: default: } } - return statuses, false, time.Since(waitStartedAt) + return result.statuses, false, result.panicErr, time.Since(waitStartedAt) case <-timer.C: b.logger.Warn(). @@ -388,6 +453,6 @@ func (b *RateLimiterBudget) doLimitWithTimeout( "limit_timeout", ).Inc() - return nil, true, time.Since(waitStartedAt) + return nil, true, nil, time.Since(waitStartedAt) } } diff --git a/upstream/ratelimiter_budget_metrics_test.go b/upstream/ratelimiter_budget_metrics_test.go index 8493f4d98..b53946b10 100644 --- a/upstream/ratelimiter_budget_metrics_test.go +++ b/upstream/ratelimiter_budget_metrics_test.go @@ -10,8 +10,8 @@ import ( "github.com/envoyproxy/ratelimit/src/limiter" "github.com/erpc/erpc/common" "github.com/erpc/erpc/telemetry" + "github.com/erpc/erpc/util" "github.com/prometheus/client_golang/prometheus" - promUtil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -34,6 +34,16 @@ func (c *delayedRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, [ func (c *delayedRateLimitCache) Flush() {} +type panicRateLimitCache struct{} + +var _ limiter.RateLimitCache = (*panicRateLimitCache)(nil) + +func (c *panicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { + panic("EOF") +} + +func (c *panicRateLimitCache) Flush() {} + func histogramSampleCount(t *testing.T, hv *prometheus.HistogramVec, labels ...string) uint64 { t.Helper() obs, err := hv.GetMetricWithLabelValues(labels...) @@ -118,7 +128,6 @@ func TestRateLimiterBudget_PermitTimingMetrics_Ok(t *testing.T) { func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { budget := newTestBudget(t) - telemetry.MetricNetworkAttemptReasonTotal.Reset() projectID := "project-a" budget.maxTimeout = 10 * time.Millisecond budget.registry.cacheMu.Lock() @@ -144,17 +153,6 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "", "timeout_fail_open", ) - beforeFailOpen := promUtil.ToFloat64( - telemetry.MetricNetworkAttemptReasonTotal.WithLabelValues( - projectID, - "n/a", - "eth_test", - telemetry.AttemptReasonFailOpen, - telemetry.MetricsVariantLabel(), - telemetry.MetricsReleaseLabel(), - ), - ) - ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -175,20 +173,77 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "", "timeout_fail_open", ) - afterFailOpen := promUtil.ToFloat64( - telemetry.MetricNetworkAttemptReasonTotal.WithLabelValues( - projectID, - "n/a", - "eth_test", - telemetry.AttemptReasonFailOpen, - telemetry.MetricsVariantLabel(), - telemetry.MetricsReleaseLabel(), - ), + assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) + assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) +} + +func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { + budget := newTestBudget(t) + telemetry.MetricNetworkAttemptReasonTotal.Reset() + projectID := "project-a" + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = &panicRateLimitCache{} + budget.registry.cacheMu.Unlock() + + beforeEval := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitEvaluationDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + beforeWait := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitWaitDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", ) + ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + assert.Nil(t, budget.registry.GetCache()) + status := budget.registry.initializer.Status() + require.Len(t, status.Tasks, 1) + assert.Equal(t, util.TaskFailed, status.Tasks[0].State) + assert.Error(t, status.Tasks[0].Err) + + afterEval := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitEvaluationDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + afterWait := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitWaitDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) - assert.Equal(t, beforeFailOpen+1, afterFailOpen) } func TestNormalizeRateLimitMethodLabel(t *testing.T) { diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index 6afdb48fa..3b606c4f6 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -34,6 +34,8 @@ type RateLimitersRegistry struct { initializer *util.Initializer } +const redisRateLimiterConnectTaskName = "redis-ratelimiter-connect" + func NewRateLimitersRegistry(appCtx context.Context, cfg *common.RateLimiterConfig, logger *zerolog.Logger) (*RateLimitersRegistry, error) { r := &RateLimitersRegistry{ appCtx: appCtx, @@ -60,7 +62,7 @@ func (r *RateLimitersRegistry) bootstrap() error { r.initializer = util.NewInitializer(r.appCtx, r.logger, nil) // Attempt Redis connection with panic recovery - don't block startup - connectTask := util.NewBootstrapTask("redis-ratelimiter-connect", r.connectRedisTask) + connectTask := util.NewBootstrapTask(redisRateLimiterConnectTaskName, r.connectRedisTask) if err := r.initializer.ExecuteTasks(r.appCtx, connectTask); err != nil { // Cache stays nil - rate limiting will fail-open until Redis connects r.logger.Warn().Err(err).Msg("failed to initialize Redis rate limiter on first attempt (rate limiting will fail-open until connected, retrying in background)") @@ -219,6 +221,16 @@ func (r *RateLimitersRegistry) GetBudgets() []*common.RateLimitBudgetConfig { return r.cfg.Budgets } +func (r *RateLimitersRegistry) onRedisCacheFailure(err error) { + r.cacheMu.Lock() + r.envoyCache = nil + r.cacheMu.Unlock() + + if r.initializer != nil { + r.initializer.MarkTaskAsFailed(redisRateLimiterConnectTaskName, err) + } +} + // AdjustBudget updates MaxCount for all rules in a budget matching a method (supports wildcards via GetRulesByMethod). func (r *RateLimitersRegistry) AdjustBudget(budgetId string, method string, newMax uint32) error { if budgetId == "" || method == "" { From c477a774bed518a825fc9530b7f150db7d35db1c Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 13:48:38 +0200 Subject: [PATCH 2/6] fix(ratelimiter): address PR review feedback --- upstream/ratelimiter_budget.go | 4 +- upstream/ratelimiter_budget_metrics_test.go | 89 +++++++++++++++++++++ upstream/ratelimiter_registry.go | 10 ++- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index b40c9a4e0..0388b6ee1 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -325,7 +325,7 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, ).Observe(waitDuration.Seconds()) observeEvaluation("panic_fail_open") telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( - "", // projectId not available here + projectId, networkLabel, userLabel, "", // agentName not available here @@ -386,7 +386,7 @@ func (b *RateLimiterBudget) doLimitSafely( Msg("panic recovered during rate limiter DoLimit (failing open)") if b.registry != nil && b.registry.cfg != nil && b.registry.cfg.Store != nil && b.registry.cfg.Store.Driver == "redis" { - b.registry.onRedisCacheFailure(panicErr) + b.registry.onRedisCacheFailure(cache, panicErr) } } }() diff --git a/upstream/ratelimiter_budget_metrics_test.go b/upstream/ratelimiter_budget_metrics_test.go index b53946b10..9f10a11ef 100644 --- a/upstream/ratelimiter_budget_metrics_test.go +++ b/upstream/ratelimiter_budget_metrics_test.go @@ -12,6 +12,7 @@ import ( "github.com/erpc/erpc/telemetry" "github.com/erpc/erpc/util" "github.com/prometheus/client_golang/prometheus" + promUtil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -44,6 +45,21 @@ func (c *panicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []* func (c *panicRateLimitCache) Flush() {} +type delayedPanicRateLimitCache struct { + delay time.Duration + paniced chan struct{} +} + +var _ limiter.RateLimitCache = (*delayedPanicRateLimitCache)(nil) + +func (c *delayedPanicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { + defer close(c.paniced) + time.Sleep(c.delay) + panic("EOF") +} + +func (c *delayedPanicRateLimitCache) Flush() {} + func histogramSampleCount(t *testing.T, hv *prometheus.HistogramVec, labels ...string) uint64 { t.Helper() obs, err := hv.GetMetricWithLabelValues(labels...) @@ -216,6 +232,17 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { "", "panic_fail_open", ) + beforeFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_panic", + ), + ) ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -242,8 +269,70 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { "", "panic_fail_open", ) + afterFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_panic", + ), + ) assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) + assert.Equal(t, beforeFailOpen+1, afterFailOpen) +} + +func TestRateLimiterBudget_LatePanicFromStaleCacheDoesNotClearHealthyReconnect(t *testing.T) { + budget := newTestBudget(t) + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + + staleCache := &delayedPanicRateLimitCache{ + delay: 40 * time.Millisecond, + paniced: make(chan struct{}), + } + healthyCache := &delayedRateLimitCache{statuses: []*pb.RateLimitResponse_DescriptorStatus{}} + budget.maxTimeout = 10 * time.Millisecond + + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = staleCache + budget.registry.cacheMu.Unlock() + + ok, err := budget.TryAcquirePermit(context.Background(), "project-a", nil, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = healthyCache + budget.registry.cacheMu.Unlock() + + select { + case <-staleCache.paniced: + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for stale cache panic") + } + + assert.Same(t, healthyCache, budget.registry.GetCache()) + + status := budget.registry.initializer.Status() + require.Len(t, status.Tasks, 1) + assert.Equal(t, util.TaskSucceeded, status.Tasks[0].State) } func TestNormalizeRateLimitMethodLabel(t *testing.T) { diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index 3b606c4f6..a2eab9a1c 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -221,8 +221,16 @@ func (r *RateLimitersRegistry) GetBudgets() []*common.RateLimitBudgetConfig { return r.cfg.Budgets } -func (r *RateLimitersRegistry) onRedisCacheFailure(err error) { +func (r *RateLimitersRegistry) onRedisCacheFailure(failedCache limiter.RateLimitCache, err error) { + if failedCache == nil { + return + } + r.cacheMu.Lock() + if r.envoyCache != failedCache { + r.cacheMu.Unlock() + return + } r.envoyCache = nil r.cacheMu.Unlock() From 7432d4cd89dd22059eaeaaf031ae435d37bd7473 Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 14:17:14 +0200 Subject: [PATCH 3/6] fix(ratelimiter): address PR review findings --- upstream/ratelimiter_budget.go | 31 +++--- upstream/ratelimiter_budget_metrics_test.go | 105 ++++++++++++++++++-- upstream/ratelimiter_registry.go | 8 +- 3 files changed, 120 insertions(+), 24 deletions(-) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 0388b6ee1..184d57454 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -310,6 +310,15 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, "timeout_fail_open", ).Observe(waitDuration.Seconds()) observeEvaluation("timeout_fail_open") + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectId, + networkLabel, + userLabel, + "", // agentName not passed to evaluateRule + b.Id, + method, + "limit_timeout", + ).Inc() telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) doSpan.SetAttributes(attribute.String("result", "timeout_fail_open")) doSpan.End() @@ -328,7 +337,7 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, projectId, networkLabel, userLabel, - "", // agentName not available here + "", // agentName not passed to evaluateRule b.Id, method, "limit_panic", @@ -385,8 +394,8 @@ func (b *RateLimiterBudget) doLimitSafely( Str("stack", string(debug.Stack())). Msg("panic recovered during rate limiter DoLimit (failing open)") - if b.registry != nil && b.registry.cfg != nil && b.registry.cfg.Store != nil && b.registry.cfg.Store.Driver == "redis" { - b.registry.onRedisCacheFailure(cache, panicErr) + if b.registry != nil { + b.registry.onCacheFailure(cache, panicErr) } } }() @@ -409,8 +418,10 @@ func (r *RateLimitRule) statsKeySuffix() string { return suffix } -// doLimitWithTimeout executes DoLimit with a timeout. -// Returns (statuses, timedOut). On timeout, returns (nil, true) and records fail-open metric. +// doLimitWithTimeout executes doLimitSafely with a timeout. +// Returns (statuses, timedOut, panicErr, waitDuration). +// On timeout, returns (nil, true, nil, elapsed). +// On panic from DoLimit, returns (nil, false, err, elapsed). func (b *RateLimiterBudget) doLimitWithTimeout( ctx context.Context, cache limiter.RateLimitCache, @@ -443,16 +454,6 @@ func (b *RateLimiterBudget) doLimitWithTimeout( Dur("timeout", b.maxTimeout). Msg("rate limiter timeout exceeded, failing open") - telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( - "", // projectId not available here - networkLabel, - userLabel, - "", // agentName not available here - b.Id, - method, - "limit_timeout", - ).Inc() - return nil, true, nil, time.Since(waitStartedAt) } } diff --git a/upstream/ratelimiter_budget_metrics_test.go b/upstream/ratelimiter_budget_metrics_test.go index 9f10a11ef..edf9696a5 100644 --- a/upstream/ratelimiter_budget_metrics_test.go +++ b/upstream/ratelimiter_budget_metrics_test.go @@ -46,14 +46,14 @@ func (c *panicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []* func (c *panicRateLimitCache) Flush() {} type delayedPanicRateLimitCache struct { - delay time.Duration - paniced chan struct{} + delay time.Duration + panicked chan struct{} } var _ limiter.RateLimitCache = (*delayedPanicRateLimitCache)(nil) func (c *delayedPanicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { - defer close(c.paniced) + defer close(c.panicked) time.Sleep(c.delay) panic("EOF") } @@ -169,6 +169,17 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "", "timeout_fail_open", ) + beforeFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_timeout", + ), + ) ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -189,13 +200,24 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "", "timeout_fail_open", ) + afterFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_timeout", + ), + ) assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) + assert.Equal(t, beforeFailOpen+1, afterFailOpen) } func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { budget := newTestBudget(t) - telemetry.MetricNetworkAttemptReasonTotal.Reset() projectID := "project-a" budget.registry.cfg.Store = &common.RateLimitStoreConfig{ Driver: "redis", @@ -243,6 +265,13 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { "limit_panic", ), ) + beforeUnexpectedPanic := promUtil.ToFloat64( + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + "budget:test-budget", + common.ErrorFingerprint("EOF"), + ), + ) ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -280,9 +309,71 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { "limit_panic", ), ) + afterUnexpectedPanic := promUtil.ToFloat64( + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + "budget:test-budget", + common.ErrorFingerprint("EOF"), + ), + ) assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) assert.Equal(t, beforeFailOpen+1, afterFailOpen) + assert.Equal(t, beforeUnexpectedPanic+1, afterUnexpectedPanic) +} + +func TestRateLimiterBudget_PermitTimingMetrics_PanicBeforeTimeoutFailOpen(t *testing.T) { + budget := newTestBudget(t) + projectID := "project-a" + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + budget.maxTimeout = 100 * time.Millisecond + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = &panicRateLimitCache{} + budget.registry.cacheMu.Unlock() + + beforeFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + + ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + assert.Nil(t, budget.registry.GetCache()) + + afterFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "", + "", + "", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + assert.Equal(t, beforeFailOpen+1, afterFailOpen) } func TestRateLimiterBudget_LatePanicFromStaleCacheDoesNotClearHealthyReconnect(t *testing.T) { @@ -304,8 +395,8 @@ func TestRateLimiterBudget_LatePanicFromStaleCacheDoesNotClearHealthyReconnect(t )) staleCache := &delayedPanicRateLimitCache{ - delay: 40 * time.Millisecond, - paniced: make(chan struct{}), + delay: 40 * time.Millisecond, + panicked: make(chan struct{}), } healthyCache := &delayedRateLimitCache{statuses: []*pb.RateLimitResponse_DescriptorStatus{}} budget.maxTimeout = 10 * time.Millisecond @@ -323,7 +414,7 @@ func TestRateLimiterBudget_LatePanicFromStaleCacheDoesNotClearHealthyReconnect(t budget.registry.cacheMu.Unlock() select { - case <-staleCache.paniced: + case <-staleCache.panicked: case <-time.After(500 * time.Millisecond): t.Fatal("timed out waiting for stale cache panic") } diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index a2eab9a1c..f71a74839 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -221,7 +221,7 @@ func (r *RateLimitersRegistry) GetBudgets() []*common.RateLimitBudgetConfig { return r.cfg.Budgets } -func (r *RateLimitersRegistry) onRedisCacheFailure(failedCache limiter.RateLimitCache, err error) { +func (r *RateLimitersRegistry) onCacheFailure(failedCache limiter.RateLimitCache, err error) { if failedCache == nil { return } @@ -234,9 +234,13 @@ func (r *RateLimitersRegistry) onRedisCacheFailure(failedCache limiter.RateLimit r.envoyCache = nil r.cacheMu.Unlock() - if r.initializer != nil { + if r.cfg != nil && r.cfg.Store != nil && r.cfg.Store.Driver == "redis" && r.initializer != nil { + r.logger.Warn().Err(err).Msg("cleared Redis rate limiter cache after failure, marking for reconnection") r.initializer.MarkTaskAsFailed(redisRateLimiterConnectTaskName, err) + return } + + r.logger.Warn().Err(err).Msg("cleared rate limiter cache after failure") } // AdjustBudget updates MaxCount for all rules in a budget matching a method (supports wildcards via GetRulesByMethod). From 69068b4eac220a7a6083f072d5cbfa11e03fd643 Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 14:30:38 +0200 Subject: [PATCH 4/6] fix(ratelimiter): address follow-up review findings --- upstream/ratelimiter_budget.go | 107 ++++++++++++-------- upstream/ratelimiter_budget_metrics_test.go | 52 ++++++---- upstream/ratelimiter_registry.go | 2 +- 3 files changed, 96 insertions(+), 65 deletions(-) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 184d57454..65153d737 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -182,7 +182,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri // Single rule: evaluate directly without goroutine overhead if len(rules) == 1 { - allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, networkLabel) + allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, agentName, networkLabel) if !allowed { telemetry.CounterHandle( telemetry.MetricRateLimitsTotal, @@ -204,7 +204,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri resultCh <- ruleResult{rule: r, allowed: true} return } - allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, networkLabel) + allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, agentName, networkLabel) if !allowed { blocked.Store(true) } @@ -234,7 +234,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri // evaluateRule checks a single rate limit rule against the cache. // Returns true if allowed, false if over limit. -func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, networkLabel string) bool { +func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, agentName, networkLabel string) bool { evalStartedAt := time.Now() scope := rule.Config.ScopeString() methodPattern := normalizeRateLimitMethodLabel(rule.Config.Method) @@ -292,61 +292,47 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, var statuses []*pb.RateLimitResponse_DescriptorStatus var timedOut bool var panicErr error - var waitDuration time.Duration + waitStartedAt := time.Now() if b.maxTimeout > 0 { - statuses, timedOut, panicErr, waitDuration = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) + statuses, timedOut, panicErr = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) } else { - waitStartedAt := time.Now() statuses, panicErr = b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) - waitDuration = time.Since(waitStartedAt) } + waitDuration := time.Since(waitStartedAt) if timedOut { - telemetry.ObserverHandle( - telemetry.MetricRateLimiterPermitWaitDuration, - b.Id, + return b.recordFailOpen( + doSpan, + waitDuration, methodPattern, scope, - "timeout_fail_open", - ).Observe(waitDuration.Seconds()) - observeEvaluation("timeout_fail_open") - telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectId, networkLabel, userLabel, - "", // agentName not passed to evaluateRule - b.Id, + agentName, method, + "timeout_fail_open", "limit_timeout", - ).Inc() - telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) - doSpan.SetAttributes(attribute.String("result", "timeout_fail_open")) - doSpan.End() - return true // fail-open + nil, + observeEvaluation, + ) } if panicErr != nil { - telemetry.ObserverHandle( - telemetry.MetricRateLimiterPermitWaitDuration, - b.Id, + return b.recordFailOpen( + doSpan, + waitDuration, methodPattern, scope, - "panic_fail_open", - ).Observe(waitDuration.Seconds()) - observeEvaluation("panic_fail_open") - telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectId, networkLabel, userLabel, - "", // agentName not passed to evaluateRule - b.Id, + agentName, method, + "panic_fail_open", "limit_panic", - ).Inc() - telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) - doSpan.RecordError(panicErr) - doSpan.SetAttributes(attribute.String("result", "panic_fail_open")) - doSpan.End() - return true // fail-open + panicErr, + observeEvaluation, + ) } outcome := "ok" @@ -370,6 +356,39 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, return !isOverLimit } +func (b *RateLimiterBudget) recordFailOpen( + doSpan trace.Span, + waitDuration time.Duration, + methodPattern, scope, projectId, networkLabel, userLabel, agentName, method, outcome, reason string, + panicErr error, + observeEvaluation func(string), +) bool { + telemetry.ObserverHandle( + telemetry.MetricRateLimiterPermitWaitDuration, + b.Id, + methodPattern, + scope, + outcome, + ).Observe(waitDuration.Seconds()) + observeEvaluation(outcome) + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectId, + networkLabel, + userLabel, + agentName, + b.Id, + method, + reason, + ).Inc() + telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) + if panicErr != nil { + doSpan.RecordError(panicErr) + } + doSpan.SetAttributes(attribute.String("result", outcome)) + doSpan.End() + return true +} + func (b *RateLimiterBudget) doLimitSafely( ctx context.Context, cache limiter.RateLimitCache, @@ -419,17 +438,16 @@ func (r *RateLimitRule) statsKeySuffix() string { } // doLimitWithTimeout executes doLimitSafely with a timeout. -// Returns (statuses, timedOut, panicErr, waitDuration). -// On timeout, returns (nil, true, nil, elapsed). -// On panic from DoLimit, returns (nil, false, err, elapsed). +// Returns (statuses, timedOut, panicErr). +// On timeout, returns (nil, true, nil). +// On panic from DoLimit, returns (nil, false, err). func (b *RateLimiterBudget) doLimitWithTimeout( ctx context.Context, cache limiter.RateLimitCache, rlReq *pb.RateLimitRequest, limits []*config.RateLimit, method, userLabel, networkLabel string, -) ([]*pb.RateLimitResponse_DescriptorStatus, bool, error, time.Duration) { - waitStartedAt := time.Now() +) ([]*pb.RateLimitResponse_DescriptorStatus, bool, error) { resultCh := make(chan doLimitResult, 1) go func() { statuses, panicErr := b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) @@ -445,7 +463,7 @@ func (b *RateLimiterBudget) doLimitWithTimeout( default: } } - return result.statuses, false, result.panicErr, time.Since(waitStartedAt) + return result.statuses, false, result.panicErr case <-timer.C: b.logger.Warn(). @@ -454,6 +472,9 @@ func (b *RateLimiterBudget) doLimitWithTimeout( Dur("timeout", b.maxTimeout). Msg("rate limiter timeout exceeded, failing open") - return nil, true, nil, time.Since(waitStartedAt) + // The detached DoLimit goroutine may still finish or panic after this return. + // That late panic is still counted via MetricUnexpectedPanicTotal and onCacheFailure, + // but this caller has already recorded timeout_fail_open for the permit attempt. + return nil, true, nil } } diff --git a/upstream/ratelimiter_budget_metrics_test.go b/upstream/ratelimiter_budget_metrics_test.go index edf9696a5..213210d2a 100644 --- a/upstream/ratelimiter_budget_metrics_test.go +++ b/upstream/ratelimiter_budget_metrics_test.go @@ -2,6 +2,7 @@ package upstream import ( "context" + "net/http" "testing" "time" @@ -97,6 +98,12 @@ func newTestBudget(t *testing.T) *RateLimiterBudget { return budget } +func newTestRequestWithAgent() *common.NormalizedRequest { + req := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":1,"method":"eth_test"}`)) + req.EnrichFromHttp(http.Header{"User-Agent": []string{"curl/8.0.1"}}, nil, common.UserAgentTrackingModeSimplified) + return req +} + func TestRateLimiterBudget_PermitTimingMetrics_Ok(t *testing.T) { budget := newTestBudget(t) @@ -145,6 +152,7 @@ func TestRateLimiterBudget_PermitTimingMetrics_Ok(t *testing.T) { func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { budget := newTestBudget(t) projectID := "project-a" + req := newTestRequestWithAgent() budget.maxTimeout = 10 * time.Millisecond budget.registry.cacheMu.Lock() budget.registry.envoyCache = &delayedRateLimitCache{ @@ -172,15 +180,15 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { beforeFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_timeout", ), ) - ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -203,9 +211,9 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { afterFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_timeout", @@ -219,6 +227,7 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { budget := newTestBudget(t) projectID := "project-a" + req := newTestRequestWithAgent() budget.registry.cfg.Store = &common.RateLimitStoreConfig{ Driver: "redis", Redis: &common.RedisConnectorConfig{URI: "redis://test"}, @@ -257,9 +266,9 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { beforeFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_panic", @@ -272,7 +281,7 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { common.ErrorFingerprint("EOF"), ), ) - ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) assert.Nil(t, budget.registry.GetCache()) @@ -301,9 +310,9 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { afterFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_panic", @@ -325,6 +334,7 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { func TestRateLimiterBudget_PermitTimingMetrics_PanicBeforeTimeoutFailOpen(t *testing.T) { budget := newTestBudget(t) projectID := "project-a" + req := newTestRequestWithAgent() budget.registry.cfg.Store = &common.RateLimitStoreConfig{ Driver: "redis", Redis: &common.RedisConnectorConfig{URI: "redis://test"}, @@ -348,16 +358,16 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicBeforeTimeoutFailOpen(t *tes beforeFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_panic", ), ) - ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) assert.Nil(t, budget.registry.GetCache()) @@ -365,9 +375,9 @@ func TestRateLimiterBudget_PermitTimingMetrics_PanicBeforeTimeoutFailOpen(t *tes afterFailOpen := promUtil.ToFloat64( telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, - "", - "", - "", + "n/a", + "n/a", + "curl", "test-budget", "eth_test", "limit_panic", diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index f71a74839..7c96c175d 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -240,7 +240,7 @@ func (r *RateLimitersRegistry) onCacheFailure(failedCache limiter.RateLimitCache return } - r.logger.Warn().Err(err).Msg("cleared rate limiter cache after failure") + r.logger.Warn().Err(err).Msg("cleared rate limiter cache after failure (rate limiting disabled until restart)") } // AdjustBudget updates MaxCount for all rules in a budget matching a method (supports wildcards via GetRulesByMethod). From 46db19d32cb5cc3a9c9a6ea2c35884a5b1214134 Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 14:47:39 +0200 Subject: [PATCH 5/6] fix(ratelimiter): address latest review findings --- upstream/ratelimiter_budget.go | 61 +++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 65153d737..95c11a893 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -25,6 +25,17 @@ type doLimitResult struct { panicErr error } +type rateLimitEvalContext struct { + methodPattern string + scope string + projectId string + networkLabel string + userLabel string + agentName string + method string + observe func(string) +} + type RateLimiterBudget struct { logger *zerolog.Logger Id string @@ -247,6 +258,16 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, outcome, ).Observe(time.Since(evalStartedAt).Seconds()) } + evalCtx := rateLimitEvalContext{ + methodPattern: methodPattern, + scope: scope, + projectId: projectId, + networkLabel: networkLabel, + userLabel: userLabel, + agentName: agentName, + method: method, + observe: observeEvaluation, + } cache := b.getCache() if cache == nil { @@ -304,34 +325,20 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, return b.recordFailOpen( doSpan, waitDuration, - methodPattern, - scope, - projectId, - networkLabel, - userLabel, - agentName, - method, + evalCtx, "timeout_fail_open", "limit_timeout", nil, - observeEvaluation, ) } if panicErr != nil { return b.recordFailOpen( doSpan, waitDuration, - methodPattern, - scope, - projectId, - networkLabel, - userLabel, - agentName, - method, + evalCtx, "panic_fail_open", "limit_panic", panicErr, - observeEvaluation, ) } @@ -359,28 +366,28 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, func (b *RateLimiterBudget) recordFailOpen( doSpan trace.Span, waitDuration time.Duration, - methodPattern, scope, projectId, networkLabel, userLabel, agentName, method, outcome, reason string, + evalCtx rateLimitEvalContext, + outcome, reason string, panicErr error, - observeEvaluation func(string), ) bool { telemetry.ObserverHandle( telemetry.MetricRateLimiterPermitWaitDuration, b.Id, - methodPattern, - scope, + evalCtx.methodPattern, + evalCtx.scope, outcome, ).Observe(waitDuration.Seconds()) - observeEvaluation(outcome) + evalCtx.observe(outcome) telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( - projectId, - networkLabel, - userLabel, - agentName, + evalCtx.projectId, + evalCtx.networkLabel, + evalCtx.userLabel, + evalCtx.agentName, b.Id, - method, + evalCtx.method, reason, ).Inc() - telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) + telemetry.IncNetworkAttemptReason(evalCtx.projectId, evalCtx.networkLabel, evalCtx.method, telemetry.AttemptReasonFailOpen) if panicErr != nil { doSpan.RecordError(panicErr) } From dae5d30d596f4413ee2800d19e4d6e2612ff18cc Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 31 Mar 2026 16:26:03 +0200 Subject: [PATCH 6/6] test(multiplexer): harden post-completion coalescing check --- erpc/networks_multiplexer_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/erpc/networks_multiplexer_test.go b/erpc/networks_multiplexer_test.go index da8395dcf..cdd6c5a2f 100644 --- a/erpc/networks_multiplexer_test.go +++ b/erpc/networks_multiplexer_test.go @@ -298,6 +298,7 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { }). Persist(). Reply(200). + Delay(50 * time.Millisecond). JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, @@ -308,10 +309,12 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { requestBody := []byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBalance","params":["0x1234567890123456789012345678901234567890","0x10"]}`) var wg sync.WaitGroup + start := make(chan struct{}) wg.Add(2) for i := 0; i < 2; i++ { go func() { defer wg.Done() + <-start req := common.NewNormalizedRequest(requestBody) resp, err := network.Forward(ctx, req) require.NoError(t, err) @@ -319,6 +322,7 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { resp.Release() }() } + close(start) wg.Wait() assert.Equal(t, int32(1), upstreamRequestCount.Load(), "initial burst should multiplex to a single upstream call")