diff --git a/erpc/networks_multiplexer_test.go b/erpc/networks_multiplexer_test.go index da8395dcf..cdd6c5a2f 100644 --- a/erpc/networks_multiplexer_test.go +++ b/erpc/networks_multiplexer_test.go @@ -298,6 +298,7 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { }). Persist(). Reply(200). + Delay(50 * time.Millisecond). JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, @@ -308,10 +309,12 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { requestBody := []byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBalance","params":["0x1234567890123456789012345678901234567890","0x10"]}`) var wg sync.WaitGroup + start := make(chan struct{}) wg.Add(2) for i := 0; i < 2; i++ { go func() { defer wg.Done() + <-start req := common.NewNormalizedRequest(requestBody) resp, err := network.Forward(ctx, req) require.NoError(t, err) @@ -319,6 +322,7 @@ func TestNetwork_Multiplexer_FollowersReceiveResponse(t *testing.T) { resp.Release() }() } + close(start) wg.Wait() assert.Equal(t, int32(1), upstreamRequestCount.Load(), "initial burst should multiplex to a single upstream call") diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index 2640f1c51..95c11a893 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "runtime/debug" "sync" "sync/atomic" "time" @@ -19,6 +20,22 @@ import ( "go.opentelemetry.io/otel/trace" ) +type doLimitResult struct { + statuses []*pb.RateLimitResponse_DescriptorStatus + panicErr error +} + +type rateLimitEvalContext struct { + methodPattern string + scope string + projectId string + networkLabel string + userLabel string + agentName string + method string + observe func(string) +} + type RateLimiterBudget struct { logger *zerolog.Logger Id string @@ -176,7 +193,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri // Single rule: evaluate directly without goroutine overhead if len(rules) == 1 { - allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, networkLabel) + allowed := b.evaluateRule(ctx, projectId, rules[0], method, clientIP, userLabel, agentName, networkLabel) if !allowed { telemetry.CounterHandle( telemetry.MetricRateLimitsTotal, @@ -198,7 +215,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri resultCh <- ruleResult{rule: r, allowed: true} return } - allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, networkLabel) + allowed := b.evaluateRule(ctx, projectId, r, method, clientIP, userLabel, agentName, networkLabel) if !allowed { blocked.Store(true) } @@ -228,7 +245,7 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri // evaluateRule checks a single rate limit rule against the cache. // Returns true if allowed, false if over limit. -func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, networkLabel string) bool { +func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, rule *RateLimitRule, method, clientIP, userLabel, agentName, networkLabel string) bool { evalStartedAt := time.Now() scope := rule.Config.ScopeString() methodPattern := normalizeRateLimitMethodLabel(rule.Config.Method) @@ -241,6 +258,16 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, outcome, ).Observe(time.Since(evalStartedAt).Seconds()) } + evalCtx := rateLimitEvalContext{ + methodPattern: methodPattern, + scope: scope, + projectId: projectId, + networkLabel: networkLabel, + userLabel: userLabel, + agentName: agentName, + method: method, + observe: observeEvaluation, + } cache := b.getCache() if cache == nil { @@ -285,28 +312,34 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, var statuses []*pb.RateLimitResponse_DescriptorStatus var timedOut bool - var waitDuration time.Duration + var panicErr error + waitStartedAt := time.Now() if b.maxTimeout > 0 { - statuses, timedOut, waitDuration = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) + statuses, timedOut, panicErr = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) } else { - waitStartedAt := time.Now() - statuses = cache.DoLimit(ctx, rlReq, limits) - waitDuration = time.Since(waitStartedAt) + statuses, panicErr = b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) } + waitDuration := time.Since(waitStartedAt) if timedOut { - telemetry.ObserverHandle( - telemetry.MetricRateLimiterPermitWaitDuration, - b.Id, - methodPattern, - scope, + return b.recordFailOpen( + doSpan, + waitDuration, + evalCtx, "timeout_fail_open", - ).Observe(waitDuration.Seconds()) - observeEvaluation("timeout_fail_open") - telemetry.IncNetworkAttemptReason(projectId, networkLabel, method, telemetry.AttemptReasonFailOpen) - doSpan.SetAttributes(attribute.String("result", "timeout_fail_open")) - doSpan.End() - return true // fail-open + "limit_timeout", + nil, + ) + } + if panicErr != nil { + return b.recordFailOpen( + doSpan, + waitDuration, + evalCtx, + "panic_fail_open", + "limit_panic", + panicErr, + ) } outcome := "ok" @@ -330,6 +363,72 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, projectId string, return !isOverLimit } +func (b *RateLimiterBudget) recordFailOpen( + doSpan trace.Span, + waitDuration time.Duration, + evalCtx rateLimitEvalContext, + outcome, reason string, + panicErr error, +) bool { + telemetry.ObserverHandle( + telemetry.MetricRateLimiterPermitWaitDuration, + b.Id, + evalCtx.methodPattern, + evalCtx.scope, + outcome, + ).Observe(waitDuration.Seconds()) + evalCtx.observe(outcome) + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + evalCtx.projectId, + evalCtx.networkLabel, + evalCtx.userLabel, + evalCtx.agentName, + b.Id, + evalCtx.method, + reason, + ).Inc() + telemetry.IncNetworkAttemptReason(evalCtx.projectId, evalCtx.networkLabel, evalCtx.method, telemetry.AttemptReasonFailOpen) + if panicErr != nil { + doSpan.RecordError(panicErr) + } + doSpan.SetAttributes(attribute.String("result", outcome)) + doSpan.End() + return true +} + +func (b *RateLimiterBudget) doLimitSafely( + ctx context.Context, + cache limiter.RateLimitCache, + rlReq *pb.RateLimitRequest, + limits []*config.RateLimit, + method, userLabel, networkLabel string, +) (statuses []*pb.RateLimitResponse_DescriptorStatus, panicErr error) { + defer func() { + if rec := recover(); rec != nil { + panicErr = fmt.Errorf("panic during rate limiter DoLimit: %v", rec) + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + fmt.Sprintf("budget:%s", b.Id), + common.ErrorFingerprint(rec), + ).Inc() + b.logger.Error(). + Str("budget", b.Id). + Str("method", method). + Str("user", userLabel). + Str("network", networkLabel). + Interface("panic", rec). + Str("stack", string(debug.Stack())). + Msg("panic recovered during rate limiter DoLimit (failing open)") + + if b.registry != nil { + b.registry.onCacheFailure(cache, panicErr) + } + } + }() + + return cache.DoLimit(ctx, rlReq, limits), nil +} + // statsKeySuffix returns the pre-computed suffix for stats key. func (r *RateLimitRule) statsKeySuffix() string { suffix := "" @@ -345,31 +444,33 @@ func (r *RateLimitRule) statsKeySuffix() string { return suffix } -// doLimitWithTimeout executes DoLimit with a timeout. -// Returns (statuses, timedOut). On timeout, returns (nil, true) and records fail-open metric. +// doLimitWithTimeout executes doLimitSafely with a timeout. +// Returns (statuses, timedOut, panicErr). +// On timeout, returns (nil, true, nil). +// On panic from DoLimit, returns (nil, false, err). func (b *RateLimiterBudget) doLimitWithTimeout( ctx context.Context, cache limiter.RateLimitCache, rlReq *pb.RateLimitRequest, limits []*config.RateLimit, method, userLabel, networkLabel string, -) ([]*pb.RateLimitResponse_DescriptorStatus, bool, time.Duration) { - waitStartedAt := time.Now() - resultCh := make(chan []*pb.RateLimitResponse_DescriptorStatus, 1) +) ([]*pb.RateLimitResponse_DescriptorStatus, bool, error) { + resultCh := make(chan doLimitResult, 1) go func() { - resultCh <- cache.DoLimit(ctx, rlReq, limits) + statuses, panicErr := b.doLimitSafely(ctx, cache, rlReq, limits, method, userLabel, networkLabel) + resultCh <- doLimitResult{statuses: statuses, panicErr: panicErr} }() timer := time.NewTimer(b.maxTimeout) select { - case statuses := <-resultCh: + case result := <-resultCh: if !timer.Stop() { select { case <-timer.C: default: } } - return statuses, false, time.Since(waitStartedAt) + return result.statuses, false, result.panicErr case <-timer.C: b.logger.Warn(). @@ -378,16 +479,9 @@ func (b *RateLimiterBudget) doLimitWithTimeout( Dur("timeout", b.maxTimeout). Msg("rate limiter timeout exceeded, failing open") - telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( - "", // projectId not available here - networkLabel, - userLabel, - "", // agentName not available here - b.Id, - method, - "limit_timeout", - ).Inc() - - return nil, true, time.Since(waitStartedAt) + // The detached DoLimit goroutine may still finish or panic after this return. + // That late panic is still counted via MetricUnexpectedPanicTotal and onCacheFailure, + // but this caller has already recorded timeout_fail_open for the permit attempt. + return nil, true, nil } } diff --git a/upstream/ratelimiter_budget_metrics_test.go b/upstream/ratelimiter_budget_metrics_test.go index 8493f4d98..213210d2a 100644 --- a/upstream/ratelimiter_budget_metrics_test.go +++ b/upstream/ratelimiter_budget_metrics_test.go @@ -2,6 +2,7 @@ package upstream import ( "context" + "net/http" "testing" "time" @@ -10,6 +11,7 @@ import ( "github.com/envoyproxy/ratelimit/src/limiter" "github.com/erpc/erpc/common" "github.com/erpc/erpc/telemetry" + "github.com/erpc/erpc/util" "github.com/prometheus/client_golang/prometheus" promUtil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" @@ -34,6 +36,31 @@ func (c *delayedRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, [ func (c *delayedRateLimitCache) Flush() {} +type panicRateLimitCache struct{} + +var _ limiter.RateLimitCache = (*panicRateLimitCache)(nil) + +func (c *panicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { + panic("EOF") +} + +func (c *panicRateLimitCache) Flush() {} + +type delayedPanicRateLimitCache struct { + delay time.Duration + panicked chan struct{} +} + +var _ limiter.RateLimitCache = (*delayedPanicRateLimitCache)(nil) + +func (c *delayedPanicRateLimitCache) DoLimit(context.Context, *pb.RateLimitRequest, []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus { + defer close(c.panicked) + time.Sleep(c.delay) + panic("EOF") +} + +func (c *delayedPanicRateLimitCache) Flush() {} + func histogramSampleCount(t *testing.T, hv *prometheus.HistogramVec, labels ...string) uint64 { t.Helper() obs, err := hv.GetMetricWithLabelValues(labels...) @@ -71,6 +98,12 @@ func newTestBudget(t *testing.T) *RateLimiterBudget { return budget } +func newTestRequestWithAgent() *common.NormalizedRequest { + req := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":1,"method":"eth_test"}`)) + req.EnrichFromHttp(http.Header{"User-Agent": []string{"curl/8.0.1"}}, nil, common.UserAgentTrackingModeSimplified) + return req +} + func TestRateLimiterBudget_PermitTimingMetrics_Ok(t *testing.T) { budget := newTestBudget(t) @@ -118,8 +151,8 @@ func TestRateLimiterBudget_PermitTimingMetrics_Ok(t *testing.T) { func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { budget := newTestBudget(t) - telemetry.MetricNetworkAttemptReasonTotal.Reset() projectID := "project-a" + req := newTestRequestWithAgent() budget.maxTimeout = 10 * time.Millisecond budget.registry.cacheMu.Lock() budget.registry.envoyCache = &delayedRateLimitCache{ @@ -145,17 +178,17 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "timeout_fail_open", ) beforeFailOpen := promUtil.ToFloat64( - telemetry.MetricNetworkAttemptReasonTotal.WithLabelValues( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, "n/a", + "n/a", + "curl", + "test-budget", "eth_test", - telemetry.AttemptReasonFailOpen, - telemetry.MetricsVariantLabel(), - telemetry.MetricsReleaseLabel(), + "limit_timeout", ), ) - - ok, err := budget.TryAcquirePermit(context.Background(), projectID, nil, "eth_test", "", "", "", "") + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") require.NoError(t, err) assert.True(t, ok) @@ -176,19 +209,231 @@ func TestRateLimiterBudget_PermitTimingMetrics_TimeoutFailOpen(t *testing.T) { "timeout_fail_open", ) afterFailOpen := promUtil.ToFloat64( - telemetry.MetricNetworkAttemptReasonTotal.WithLabelValues( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( projectID, "n/a", + "n/a", + "curl", + "test-budget", "eth_test", - telemetry.AttemptReasonFailOpen, - telemetry.MetricsVariantLabel(), - telemetry.MetricsReleaseLabel(), + "limit_timeout", ), ) + assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) + assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) + assert.Equal(t, beforeFailOpen+1, afterFailOpen) +} + +func TestRateLimiterBudget_PermitTimingMetrics_PanicFailOpen(t *testing.T) { + budget := newTestBudget(t) + projectID := "project-a" + req := newTestRequestWithAgent() + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = &panicRateLimitCache{} + budget.registry.cacheMu.Unlock() + beforeEval := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitEvaluationDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + beforeWait := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitWaitDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + beforeFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "n/a", + "n/a", + "curl", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + beforeUnexpectedPanic := promUtil.ToFloat64( + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + "budget:test-budget", + common.ErrorFingerprint("EOF"), + ), + ) + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + assert.Nil(t, budget.registry.GetCache()) + + status := budget.registry.initializer.Status() + require.Len(t, status.Tasks, 1) + assert.Equal(t, util.TaskFailed, status.Tasks[0].State) + assert.Error(t, status.Tasks[0].Err) + + afterEval := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitEvaluationDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + afterWait := histogramSampleCount( + t, + telemetry.MetricRateLimiterPermitWaitDuration, + "test-budget", + "eth_test", + "", + "panic_fail_open", + ) + afterFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "n/a", + "n/a", + "curl", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + afterUnexpectedPanic := promUtil.ToFloat64( + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-do-limit", + "budget:test-budget", + common.ErrorFingerprint("EOF"), + ), + ) assert.GreaterOrEqual(t, afterEval-beforeEval, uint64(1)) assert.GreaterOrEqual(t, afterWait-beforeWait, uint64(1)) assert.Equal(t, beforeFailOpen+1, afterFailOpen) + assert.Equal(t, beforeUnexpectedPanic+1, afterUnexpectedPanic) +} + +func TestRateLimiterBudget_PermitTimingMetrics_PanicBeforeTimeoutFailOpen(t *testing.T) { + budget := newTestBudget(t) + projectID := "project-a" + req := newTestRequestWithAgent() + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + budget.maxTimeout = 100 * time.Millisecond + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = &panicRateLimitCache{} + budget.registry.cacheMu.Unlock() + + beforeFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "n/a", + "n/a", + "curl", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + + ok, err := budget.TryAcquirePermit(context.Background(), projectID, req, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + assert.Nil(t, budget.registry.GetCache()) + + afterFailOpen := promUtil.ToFloat64( + telemetry.MetricRateLimiterFailopenTotal.WithLabelValues( + projectID, + "n/a", + "n/a", + "curl", + "test-budget", + "eth_test", + "limit_panic", + ), + ) + assert.Equal(t, beforeFailOpen+1, afterFailOpen) +} + +func TestRateLimiterBudget_LatePanicFromStaleCacheDoesNotClearHealthyReconnect(t *testing.T) { + budget := newTestBudget(t) + budget.registry.cfg.Store = &common.RateLimitStoreConfig{ + Driver: "redis", + Redis: &common.RedisConnectorConfig{URI: "redis://test"}, + } + budget.registry.initializer = util.NewInitializer(context.Background(), budget.logger, &util.InitializerConfig{ + TaskTimeout: time.Second, + AutoRetry: false, + RetryFactor: 1, + RetryMinDelay: time.Second, + RetryMaxDelay: time.Second, + }) + require.NoError(t, budget.registry.initializer.ExecuteTasks( + context.Background(), + util.NewBootstrapTask(redisRateLimiterConnectTaskName, func(context.Context) error { return nil }), + )) + + staleCache := &delayedPanicRateLimitCache{ + delay: 40 * time.Millisecond, + panicked: make(chan struct{}), + } + healthyCache := &delayedRateLimitCache{statuses: []*pb.RateLimitResponse_DescriptorStatus{}} + budget.maxTimeout = 10 * time.Millisecond + + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = staleCache + budget.registry.cacheMu.Unlock() + + ok, err := budget.TryAcquirePermit(context.Background(), "project-a", nil, "eth_test", "", "", "", "") + require.NoError(t, err) + assert.True(t, ok) + + budget.registry.cacheMu.Lock() + budget.registry.envoyCache = healthyCache + budget.registry.cacheMu.Unlock() + + select { + case <-staleCache.panicked: + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for stale cache panic") + } + + assert.Same(t, healthyCache, budget.registry.GetCache()) + + status := budget.registry.initializer.Status() + require.Len(t, status.Tasks, 1) + assert.Equal(t, util.TaskSucceeded, status.Tasks[0].State) } func TestNormalizeRateLimitMethodLabel(t *testing.T) { diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index 6afdb48fa..7c96c175d 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -34,6 +34,8 @@ type RateLimitersRegistry struct { initializer *util.Initializer } +const redisRateLimiterConnectTaskName = "redis-ratelimiter-connect" + func NewRateLimitersRegistry(appCtx context.Context, cfg *common.RateLimiterConfig, logger *zerolog.Logger) (*RateLimitersRegistry, error) { r := &RateLimitersRegistry{ appCtx: appCtx, @@ -60,7 +62,7 @@ func (r *RateLimitersRegistry) bootstrap() error { r.initializer = util.NewInitializer(r.appCtx, r.logger, nil) // Attempt Redis connection with panic recovery - don't block startup - connectTask := util.NewBootstrapTask("redis-ratelimiter-connect", r.connectRedisTask) + connectTask := util.NewBootstrapTask(redisRateLimiterConnectTaskName, r.connectRedisTask) if err := r.initializer.ExecuteTasks(r.appCtx, connectTask); err != nil { // Cache stays nil - rate limiting will fail-open until Redis connects r.logger.Warn().Err(err).Msg("failed to initialize Redis rate limiter on first attempt (rate limiting will fail-open until connected, retrying in background)") @@ -219,6 +221,28 @@ func (r *RateLimitersRegistry) GetBudgets() []*common.RateLimitBudgetConfig { return r.cfg.Budgets } +func (r *RateLimitersRegistry) onCacheFailure(failedCache limiter.RateLimitCache, err error) { + if failedCache == nil { + return + } + + r.cacheMu.Lock() + if r.envoyCache != failedCache { + r.cacheMu.Unlock() + return + } + r.envoyCache = nil + r.cacheMu.Unlock() + + if r.cfg != nil && r.cfg.Store != nil && r.cfg.Store.Driver == "redis" && r.initializer != nil { + r.logger.Warn().Err(err).Msg("cleared Redis rate limiter cache after failure, marking for reconnection") + r.initializer.MarkTaskAsFailed(redisRateLimiterConnectTaskName, err) + return + } + + r.logger.Warn().Err(err).Msg("cleared rate limiter cache after failure (rate limiting disabled until restart)") +} + // AdjustBudget updates MaxCount for all rules in a budget matching a method (supports wildcards via GetRulesByMethod). func (r *RateLimitersRegistry) AdjustBudget(budgetId string, method string, newMax uint32) error { if budgetId == "" || method == "" {