diff --git a/clients/http_json_rpc_client.go b/clients/http_json_rpc_client.go index b9bb2e5ff..d64bb37ef 100644 --- a/clients/http_json_rpc_client.go +++ b/clients/http_json_rpc_client.go @@ -91,14 +91,29 @@ func NewGenericHttpJsonRpcClient( // Default fallback transport (no proxy) // Optimized for high-latency, high-RPS scenarios to prevent connection churn + var timeouts *common.HTTPClientTimeouts + if jsonRpcCfg != nil { + timeouts = &jsonRpcCfg.HTTPClientTimeouts + } + resolved := timeouts.Resolve() + + logger.Debug(). + Dur("timeout", resolved.Timeout). + Dur("responseHeaderTimeout", resolved.ResponseHeaderTimeout). + Dur("tlsHandshakeTimeout", resolved.TLSHandshakeTimeout). + Dur("idleConnTimeout", resolved.IdleConnTimeout). + Dur("expectContinueTimeout", resolved.ExpectContinueTimeout). + Str("upstreamId", upstream.Id()). + Msg("creating HTTP client with timeout configuration") + transport := &http.Transport{ MaxIdleConns: 1024, MaxIdleConnsPerHost: 256, MaxConnsPerHost: 0, // Unlimited active connections (prevents bottleneck) - IdleConnTimeout: 90 * time.Second, - ResponseHeaderTimeout: 30 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, + IdleConnTimeout: resolved.IdleConnTimeout, + ResponseHeaderTimeout: resolved.ResponseHeaderTimeout, + TLSHandshakeTimeout: resolved.TLSHandshakeTimeout, + ExpectContinueTimeout: resolved.ExpectContinueTimeout, } if util.IsTest() { @@ -107,7 +122,7 @@ func NewGenericHttpJsonRpcClient( } } else { client.httpClient = &http.Client{ - Timeout: 60 * time.Second, + Timeout: resolved.Timeout, Transport: transport, } } @@ -203,14 +218,27 @@ func (c *GenericHttpJsonRpcClient) shutdown() { func (c *GenericHttpJsonRpcClient) getHttpClient() *http.Client { if c.proxyPool != nil { client, err := c.proxyPool.GetClient() - if c.isLogLevelTrace { - proxy, _ := client.Transport.(*http.Transport).Proxy(nil) - c.logger.Trace().Str("proxyPool", c.proxyPool.ID).Str("ptr", fmt.Sprintf("%p", client.Transport)).Str("proxy", proxy.String()).Msgf("using client from proxy pool") - } if err != nil { - c.logger.Error().Err(err).Msgf("failed to get client from proxy pool") + c.logger.Error(). + Err(err). + Str("proxyPool", c.proxyPool.ID). + Str("upstreamId", c.upstream.Id()). + Bool("fallbackToDirectConnection", true). + Msg("failed to get client from proxy pool, falling back to direct connection") return c.httpClient } + if c.isLogLevelTrace { + if transport, ok := client.Transport.(*http.Transport); ok && transport != nil { + proxy, proxyErr := transport.Proxy(nil) + if proxyErr == nil && proxy != nil { + c.logger.Trace(). + Str("proxyPool", c.proxyPool.ID). + Str("ptr", fmt.Sprintf("%p", client.Transport)). + Str("proxy", proxy.String()). + Msg("using client from proxy pool") + } + } + } return client } diff --git a/clients/proxy_pool_registry.go b/clients/proxy_pool_registry.go index 27bd7749c..33b244373 100644 --- a/clients/proxy_pool_registry.go +++ b/clients/proxy_pool_registry.go @@ -5,7 +5,6 @@ import ( "net/http" "net/url" "sync/atomic" - "time" "github.com/erpc/erpc/common" "github.com/rs/zerolog" @@ -55,10 +54,17 @@ func NewProxyPoolRegistry( return nil, err } r.pools[poolCfg.ID] = pool + + resolved := poolCfg.HTTPClientTimeouts.Resolve() logger.Debug(). Str("poolId", poolCfg.ID). Int("clientCount", len(pool.clients)). - Msg("proxy pool created") + Dur("timeout", resolved.Timeout). + Dur("responseHeaderTimeout", resolved.ResponseHeaderTimeout). + Dur("tlsHandshakeTimeout", resolved.TLSHandshakeTimeout). + Dur("idleConnTimeout", resolved.IdleConnTimeout). + Dur("expectContinueTimeout", resolved.ExpectContinueTimeout). + Msg("proxy pool created with timeout configuration") } return r, nil @@ -70,6 +76,7 @@ func createProxyPool(poolCfg common.ProxyPoolConfig) (*ProxyPool, error) { return &ProxyPool{ID: poolCfg.ID}, fmt.Errorf("no proxy URLs defined for pool '%s'. at least one proxy URL is required", poolCfg.ID) } + resolved := poolCfg.HTTPClientTimeouts.Resolve() clients := make([]*http.Client, 0, len(poolCfg.Urls)) for _, proxyStr := range poolCfg.Urls { @@ -82,14 +89,14 @@ func createProxyPool(poolCfg common.ProxyPoolConfig) (*ProxyPool, error) { MaxIdleConns: 1024, MaxIdleConnsPerHost: 256, MaxConnsPerHost: 0, // Unlimited active connections (prevents bottleneck) - IdleConnTimeout: 90 * time.Second, - ResponseHeaderTimeout: 30 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, + IdleConnTimeout: resolved.IdleConnTimeout, + ResponseHeaderTimeout: resolved.ResponseHeaderTimeout, + TLSHandshakeTimeout: resolved.TLSHandshakeTimeout, + ExpectContinueTimeout: resolved.ExpectContinueTimeout, Proxy: http.ProxyURL(proxyURL), } client := &http.Client{ - Timeout: 60 * time.Second, + Timeout: resolved.Timeout, Transport: transport, } clients = append(clients, client) diff --git a/common/config.go b/common/config.go index cc27f4991..145e28351 100644 --- a/common/config.go +++ b/common/config.go @@ -786,6 +786,139 @@ func (c *RateLimitAutoTuneConfig) Copy() *RateLimitAutoTuneConfig { return copied } +// HTTPClientTimeouts contains HTTP client timeout configuration fields. +// These are shared between JsonRpcUpstreamConfig and ProxyPoolConfig. +type HTTPClientTimeouts struct { + // Timeout is the total time limit for a request including connection, headers, and body. + // Default: 60s + Timeout Duration `yaml:"timeout,omitempty" json:"timeout" tstype:"Duration"` + + // ResponseHeaderTimeout specifies the time to wait for a server's response headers + // after fully writing the request. This does not include the time to read the response body. + // Default: 30s + ResponseHeaderTimeout Duration `yaml:"responseHeaderTimeout,omitempty" json:"responseHeaderTimeout" tstype:"Duration"` + + // TLSHandshakeTimeout specifies the maximum time waiting for a TLS handshake. + // Default: 10s + TLSHandshakeTimeout Duration `yaml:"tlsHandshakeTimeout,omitempty" json:"tlsHandshakeTimeout" tstype:"Duration"` + + // IdleConnTimeout is the maximum time an idle connection will remain idle before closing. + // Default: 90s + IdleConnTimeout Duration `yaml:"idleConnTimeout,omitempty" json:"idleConnTimeout" tstype:"Duration"` + + // ExpectContinueTimeout specifies the time to wait for a server's first response headers + // after fully writing the request headers if the request has an "Expect: 100-continue" header. + // Default: 1s + ExpectContinueTimeout Duration `yaml:"expectContinueTimeout,omitempty" json:"expectContinueTimeout" tstype:"Duration"` +} + +// Default timeout values for HTTP clients +const ( + DefaultHTTPClientTimeout = 60 * time.Second + DefaultResponseHeaderTimeout = 30 * time.Second + DefaultTLSHandshakeTimeout = 10 * time.Second + DefaultIdleConnTimeout = 90 * time.Second + DefaultExpectContinueTimeout = 1 * time.Second +) + +// ResolvedHTTPClientTimeouts contains the resolved timeout values with defaults applied. +type ResolvedHTTPClientTimeouts struct { + Timeout time.Duration + ResponseHeaderTimeout time.Duration + TLSHandshakeTimeout time.Duration + IdleConnTimeout time.Duration + ExpectContinueTimeout time.Duration +} + +// Resolve applies default values to any unset timeout fields and returns resolved timeouts. +func (t *HTTPClientTimeouts) Resolve() ResolvedHTTPClientTimeouts { + if t == nil { + return ResolvedHTTPClientTimeouts{ + Timeout: DefaultHTTPClientTimeout, + ResponseHeaderTimeout: DefaultResponseHeaderTimeout, + TLSHandshakeTimeout: DefaultTLSHandshakeTimeout, + IdleConnTimeout: DefaultIdleConnTimeout, + ExpectContinueTimeout: DefaultExpectContinueTimeout, + } + } + return ResolvedHTTPClientTimeouts{ + Timeout: t.Timeout.WithDefault(DefaultHTTPClientTimeout), + ResponseHeaderTimeout: t.ResponseHeaderTimeout.WithDefault(DefaultResponseHeaderTimeout), + TLSHandshakeTimeout: t.TLSHandshakeTimeout.WithDefault(DefaultTLSHandshakeTimeout), + IdleConnTimeout: t.IdleConnTimeout.WithDefault(DefaultIdleConnTimeout), + ExpectContinueTimeout: t.ExpectContinueTimeout.WithDefault(DefaultExpectContinueTimeout), + } +} + +// MergeFrom fills unset (zero value) timeout fields with values from defaults. +// This mutates the receiver in-place. +func (t *HTTPClientTimeouts) MergeFrom(defaults *HTTPClientTimeouts) { + if defaults == nil { + return + } + if t.Timeout == 0 && defaults.Timeout != 0 { + t.Timeout = defaults.Timeout + } + if t.ResponseHeaderTimeout == 0 && defaults.ResponseHeaderTimeout != 0 { + t.ResponseHeaderTimeout = defaults.ResponseHeaderTimeout + } + if t.TLSHandshakeTimeout == 0 && defaults.TLSHandshakeTimeout != 0 { + t.TLSHandshakeTimeout = defaults.TLSHandshakeTimeout + } + if t.IdleConnTimeout == 0 && defaults.IdleConnTimeout != 0 { + t.IdleConnTimeout = defaults.IdleConnTimeout + } + if t.ExpectContinueTimeout == 0 && defaults.ExpectContinueTimeout != 0 { + t.ExpectContinueTimeout = defaults.ExpectContinueTimeout + } +} + +// Validate checks that all configured timeout values are valid (positive durations). +// Zero values are allowed as they indicate "use default". +// Also validates that sub-timeouts (responseHeaderTimeout, tlsHandshakeTimeout, +// expectContinueTimeout) do not exceed the total request timeout. +func (t *HTTPClientTimeouts) Validate(prefix string) error { + if t == nil { + return nil + } + + // Validate individual timeout values - they must be positive if set + if t.Timeout != 0 && t.Timeout.Duration() <= 0 { + return fmt.Errorf("%stimeout must be a positive duration, got: %v", prefix, t.Timeout) + } + if t.ResponseHeaderTimeout != 0 && t.ResponseHeaderTimeout.Duration() <= 0 { + return fmt.Errorf("%sresponseHeaderTimeout must be a positive duration, got: %v", prefix, t.ResponseHeaderTimeout) + } + if t.TLSHandshakeTimeout != 0 && t.TLSHandshakeTimeout.Duration() <= 0 { + return fmt.Errorf("%stlsHandshakeTimeout must be a positive duration, got: %v", prefix, t.TLSHandshakeTimeout) + } + if t.IdleConnTimeout != 0 && t.IdleConnTimeout.Duration() <= 0 { + return fmt.Errorf("%sidleConnTimeout must be a positive duration, got: %v", prefix, t.IdleConnTimeout) + } + if t.ExpectContinueTimeout != 0 && t.ExpectContinueTimeout.Duration() <= 0 { + return fmt.Errorf("%sexpectContinueTimeout must be a positive duration, got: %v", prefix, t.ExpectContinueTimeout) + } + + // Validate timeout relationships + if t.Timeout != 0 && t.ResponseHeaderTimeout != 0 && + t.ResponseHeaderTimeout.Duration() > t.Timeout.Duration() { + return fmt.Errorf("%sresponseHeaderTimeout (%v) cannot exceed timeout (%v)", + prefix, t.ResponseHeaderTimeout, t.Timeout) + } + if t.Timeout != 0 && t.TLSHandshakeTimeout != 0 && + t.TLSHandshakeTimeout.Duration() > t.Timeout.Duration() { + return fmt.Errorf("%stlsHandshakeTimeout (%v) cannot exceed timeout (%v)", + prefix, t.TLSHandshakeTimeout, t.Timeout) + } + if t.Timeout != 0 && t.ExpectContinueTimeout != 0 && + t.ExpectContinueTimeout.Duration() > t.Timeout.Duration() { + return fmt.Errorf("%sexpectContinueTimeout (%v) cannot exceed timeout (%v)", + prefix, t.ExpectContinueTimeout, t.Timeout) + } + + return nil +} + type JsonRpcUpstreamConfig struct { SupportsBatch *bool `yaml:"supportsBatch,omitempty" json:"supportsBatch"` BatchMaxSize int `yaml:"batchMaxSize,omitempty" json:"batchMaxSize"` @@ -793,6 +926,9 @@ type JsonRpcUpstreamConfig struct { EnableGzip *bool `yaml:"enableGzip,omitempty" json:"enableGzip"` Headers map[string]string `yaml:"headers,omitempty" json:"headers"` ProxyPool string `yaml:"proxyPool,omitempty" json:"proxyPool"` + + // HTTP client timeout settings (optional, with sensible defaults) + HTTPClientTimeouts `yaml:",inline" json:",inline"` } func (c *JsonRpcUpstreamConfig) Copy() *JsonRpcUpstreamConfig { @@ -1385,6 +1521,9 @@ func (c *Config) HasRateLimiterBudget(id string) bool { type ProxyPoolConfig struct { ID string `yaml:"id" json:"id"` Urls []string `yaml:"urls" json:"urls"` + + // HTTP client timeout settings (optional, with sensible defaults) + HTTPClientTimeouts `yaml:",inline" json:",inline"` } type DeprecatedProjectHealthCheckConfig struct { diff --git a/common/config_test.go b/common/config_test.go index 361450ee8..39349ca2c 100644 --- a/common/config_test.go +++ b/common/config_test.go @@ -838,3 +838,376 @@ projects: assert.Equal(t, 5, network.Failsafe[1].Retry.MaxAttempts) }) } + +func TestHTTPClientTimeouts_Validate(t *testing.T) { + t.Run("nil timeouts should pass validation", func(t *testing.T) { + var timeouts *HTTPClientTimeouts + err := timeouts.Validate("") + assert.NoError(t, err) + }) + + t.Run("zero values should pass validation (use defaults)", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{} + err := timeouts.Validate("") + assert.NoError(t, err) + }) + + t.Run("positive values should pass validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + ResponseHeaderTimeout: Duration(30 * time.Second), + TLSHandshakeTimeout: Duration(10 * time.Second), + IdleConnTimeout: Duration(90 * time.Second), + ExpectContinueTimeout: Duration(1 * time.Second), + } + err := timeouts.Validate("") + assert.NoError(t, err) + }) + + t.Run("negative timeout should fail validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(-30 * time.Second), + } + err := timeouts.Validate("test: ") + assert.Error(t, err) + assert.Contains(t, err.Error(), "test: timeout must be a positive duration") + }) + + t.Run("negative responseHeaderTimeout should fail validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + ResponseHeaderTimeout: Duration(-10 * time.Second), + } + err := timeouts.Validate("") + assert.Error(t, err) + assert.Contains(t, err.Error(), "responseHeaderTimeout must be a positive duration") + }) + + t.Run("responseHeaderTimeout exceeding timeout should fail validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(30 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + } + err := timeouts.Validate("") + assert.Error(t, err) + assert.Contains(t, err.Error(), "responseHeaderTimeout") + assert.Contains(t, err.Error(), "cannot exceed timeout") + }) + + t.Run("tlsHandshakeTimeout exceeding timeout should fail validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(5 * time.Second), + TLSHandshakeTimeout: Duration(10 * time.Second), + } + err := timeouts.Validate("") + assert.Error(t, err) + assert.Contains(t, err.Error(), "tlsHandshakeTimeout") + assert.Contains(t, err.Error(), "cannot exceed timeout") + }) + + t.Run("valid relationship (responseHeaderTimeout < timeout) should pass", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + ResponseHeaderTimeout: Duration(30 * time.Second), + } + err := timeouts.Validate("") + assert.NoError(t, err) + }) +} + +func TestHTTPClientTimeouts_Resolve(t *testing.T) { + t.Run("nil timeouts should return defaults", func(t *testing.T) { + var timeouts *HTTPClientTimeouts + resolved := timeouts.Resolve() + assert.Equal(t, DefaultHTTPClientTimeout, resolved.Timeout) + assert.Equal(t, DefaultResponseHeaderTimeout, resolved.ResponseHeaderTimeout) + assert.Equal(t, DefaultTLSHandshakeTimeout, resolved.TLSHandshakeTimeout) + assert.Equal(t, DefaultIdleConnTimeout, resolved.IdleConnTimeout) + assert.Equal(t, DefaultExpectContinueTimeout, resolved.ExpectContinueTimeout) + }) + + t.Run("zero values should return defaults", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{} + resolved := timeouts.Resolve() + assert.Equal(t, DefaultHTTPClientTimeout, resolved.Timeout) + assert.Equal(t, DefaultResponseHeaderTimeout, resolved.ResponseHeaderTimeout) + }) + + t.Run("configured values should override defaults", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + } + resolved := timeouts.Resolve() + assert.Equal(t, 120*time.Second, resolved.Timeout) + assert.Equal(t, 60*time.Second, resolved.ResponseHeaderTimeout) + // Unset values should still use defaults + assert.Equal(t, DefaultTLSHandshakeTimeout, resolved.TLSHandshakeTimeout) + }) +} + +func TestHTTPClientTimeouts_MergeFrom(t *testing.T) { + t.Run("merge from nil defaults should not change values", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + } + timeouts.MergeFrom(nil) + assert.Equal(t, Duration(60*time.Second), timeouts.Timeout) + }) + + t.Run("zero values should be filled from defaults", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{} + defaults := &HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + } + timeouts.MergeFrom(defaults) + assert.Equal(t, Duration(120*time.Second), timeouts.Timeout) + assert.Equal(t, Duration(60*time.Second), timeouts.ResponseHeaderTimeout) + }) + + t.Run("existing values should not be overwritten", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(30 * time.Second), + } + defaults := &HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + } + timeouts.MergeFrom(defaults) + assert.Equal(t, Duration(30*time.Second), timeouts.Timeout) // Not overwritten + assert.Equal(t, Duration(60*time.Second), timeouts.ResponseHeaderTimeout) // Merged + }) +} + +func TestHTTPClientTimeouts_Validate_ExpectContinueTimeout(t *testing.T) { + t.Run("expectContinueTimeout exceeding timeout should fail validation", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(5 * time.Second), + ExpectContinueTimeout: Duration(10 * time.Second), + } + err := timeouts.Validate("") + assert.Error(t, err) + assert.Contains(t, err.Error(), "expectContinueTimeout") + assert.Contains(t, err.Error(), "cannot exceed timeout") + }) + + t.Run("valid expectContinueTimeout should pass", func(t *testing.T) { + timeouts := &HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + ExpectContinueTimeout: Duration(1 * time.Second), + } + err := timeouts.Validate("") + assert.NoError(t, err) + }) +} + +func TestDuration_WithDefault(t *testing.T) { + t.Run("zero duration should return default", func(t *testing.T) { + d := Duration(0) + result := d.WithDefault(30 * time.Second) + assert.Equal(t, 30*time.Second, result) + }) + + t.Run("positive duration should return itself", func(t *testing.T) { + d := Duration(60 * time.Second) + result := d.WithDefault(30 * time.Second) + assert.Equal(t, 60*time.Second, result) + }) + + t.Run("negative duration should return default", func(t *testing.T) { + d := Duration(-10 * time.Second) + result := d.WithDefault(30 * time.Second) + assert.Equal(t, 30*time.Second, result) + }) + + t.Run("small positive duration should return itself", func(t *testing.T) { + d := Duration(1 * time.Millisecond) + result := d.WithDefault(30 * time.Second) + assert.Equal(t, 1*time.Millisecond, result) + }) +} + +func TestUpstreamConfig_ApplyDefaults_HTTPClientTimeouts(t *testing.T) { + t.Run("upstream with nil JsonRpc inherits from defaults", func(t *testing.T) { + defaults := &UpstreamConfig{ + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + }, + }, + } + + upstream := &UpstreamConfig{ + Endpoint: "http://rpc1.localhost", + } + + err := upstream.ApplyDefaults(defaults) + assert.NoError(t, err) + assert.NotNil(t, upstream.JsonRpc) + assert.Equal(t, Duration(120*time.Second), upstream.JsonRpc.Timeout) + assert.Equal(t, Duration(60*time.Second), upstream.JsonRpc.ResponseHeaderTimeout) + }) + + t.Run("upstream with empty JsonRpc HTTPClientTimeouts inherits from defaults", func(t *testing.T) { + defaults := &UpstreamConfig{ + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + TLSHandshakeTimeout: Duration(15 * time.Second), + }, + }, + } + + upstream := &UpstreamConfig{ + Endpoint: "http://rpc1.localhost", + JsonRpc: &JsonRpcUpstreamConfig{}, + } + + err := upstream.ApplyDefaults(defaults) + assert.NoError(t, err) + assert.Equal(t, Duration(120*time.Second), upstream.JsonRpc.Timeout) + assert.Equal(t, Duration(60*time.Second), upstream.JsonRpc.ResponseHeaderTimeout) + assert.Equal(t, Duration(15*time.Second), upstream.JsonRpc.TLSHandshakeTimeout) + }) + + t.Run("upstream HTTPClientTimeouts override defaults", func(t *testing.T) { + defaults := &UpstreamConfig{ + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + }, + }, + } + + upstream := &UpstreamConfig{ + Endpoint: "http://rpc1.localhost", + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(30 * time.Second), // Override + }, + }, + } + + err := upstream.ApplyDefaults(defaults) + assert.NoError(t, err) + assert.Equal(t, Duration(30*time.Second), upstream.JsonRpc.Timeout) // Not overwritten + assert.Equal(t, Duration(60*time.Second), upstream.JsonRpc.ResponseHeaderTimeout) // Inherited + }) + + t.Run("partial timeout inheritance", func(t *testing.T) { + defaults := &UpstreamConfig{ + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(120 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + TLSHandshakeTimeout: Duration(15 * time.Second), + IdleConnTimeout: Duration(180 * time.Second), + ExpectContinueTimeout: Duration(2 * time.Second), + }, + }, + } + + upstream := &UpstreamConfig{ + Endpoint: "http://rpc1.localhost", + JsonRpc: &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(30 * time.Second), + TLSHandshakeTimeout: Duration(5 * time.Second), + }, + }, + } + + err := upstream.ApplyDefaults(defaults) + assert.NoError(t, err) + assert.Equal(t, Duration(30*time.Second), upstream.JsonRpc.Timeout) // Not overwritten + assert.Equal(t, Duration(60*time.Second), upstream.JsonRpc.ResponseHeaderTimeout) // Inherited + assert.Equal(t, Duration(5*time.Second), upstream.JsonRpc.TLSHandshakeTimeout) // Not overwritten + assert.Equal(t, Duration(180*time.Second), upstream.JsonRpc.IdleConnTimeout) // Inherited + assert.Equal(t, Duration(2*time.Second), upstream.JsonRpc.ExpectContinueTimeout) // Inherited + }) +} + +func TestJsonRpcUpstreamConfig_Validate_HTTPClientTimeouts(t *testing.T) { + t.Run("valid HTTPClientTimeouts should pass validation", func(t *testing.T) { + cfg := &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + ResponseHeaderTimeout: Duration(30 * time.Second), + }, + } + err := cfg.Validate(&Config{}) + assert.NoError(t, err) + }) + + t.Run("invalid HTTPClientTimeouts should fail validation", func(t *testing.T) { + cfg := &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(-30 * time.Second), + }, + } + err := cfg.Validate(&Config{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "jsonRpc:") + assert.Contains(t, err.Error(), "timeout must be a positive duration") + }) + + t.Run("HTTPClientTimeouts relationship validation", func(t *testing.T) { + cfg := &JsonRpcUpstreamConfig{ + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(30 * time.Second), + ResponseHeaderTimeout: Duration(60 * time.Second), + }, + } + err := cfg.Validate(&Config{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "responseHeaderTimeout") + assert.Contains(t, err.Error(), "cannot exceed timeout") + }) +} + +func TestProxyPoolConfig_Validate_HTTPClientTimeouts(t *testing.T) { + t.Run("valid HTTPClientTimeouts should pass validation", func(t *testing.T) { + cfg := &ProxyPoolConfig{ + ID: "test-pool", + Urls: []string{"http://proxy1.example.com:8080"}, + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(60 * time.Second), + ResponseHeaderTimeout: Duration(30 * time.Second), + }, + } + err := cfg.Validate() + assert.NoError(t, err) + }) + + t.Run("invalid HTTPClientTimeouts should fail validation with pool ID", func(t *testing.T) { + cfg := &ProxyPoolConfig{ + ID: "my-proxy-pool", + Urls: []string{"http://proxy1.example.com:8080"}, + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(-30 * time.Second), + }, + } + err := cfg.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "proxyPool 'my-proxy-pool':") + assert.Contains(t, err.Error(), "timeout must be a positive duration") + }) + + t.Run("HTTPClientTimeouts relationship validation", func(t *testing.T) { + cfg := &ProxyPoolConfig{ + ID: "test-pool", + Urls: []string{"http://proxy1.example.com:8080"}, + HTTPClientTimeouts: HTTPClientTimeouts{ + Timeout: Duration(5 * time.Second), + TLSHandshakeTimeout: Duration(10 * time.Second), + }, + } + err := cfg.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "tlsHandshakeTimeout") + assert.Contains(t, err.Error(), "cannot exceed timeout") + }) +} diff --git a/common/defaults.go b/common/defaults.go index 4e726e1f6..1c1b34d6f 100644 --- a/common/defaults.go +++ b/common/defaults.go @@ -1491,13 +1491,35 @@ func (u *UpstreamConfig) ApplyDefaults(defaults *UpstreamConfig) error { } if u.JsonRpc == nil && defaults.JsonRpc != nil { u.JsonRpc = &JsonRpcUpstreamConfig{ - SupportsBatch: defaults.JsonRpc.SupportsBatch, - BatchMaxSize: defaults.JsonRpc.BatchMaxSize, - BatchMaxWait: defaults.JsonRpc.BatchMaxWait, - EnableGzip: defaults.JsonRpc.EnableGzip, - ProxyPool: defaults.JsonRpc.ProxyPool, - Headers: defaults.JsonRpc.Headers, + SupportsBatch: defaults.JsonRpc.SupportsBatch, + BatchMaxSize: defaults.JsonRpc.BatchMaxSize, + BatchMaxWait: defaults.JsonRpc.BatchMaxWait, + EnableGzip: defaults.JsonRpc.EnableGzip, + ProxyPool: defaults.JsonRpc.ProxyPool, + Headers: defaults.JsonRpc.Headers, + HTTPClientTimeouts: defaults.JsonRpc.HTTPClientTimeouts, } + } else if u.JsonRpc != nil && defaults.JsonRpc != nil { + // Merge individual fields from defaults if not set on upstream + if u.JsonRpc.SupportsBatch == nil && defaults.JsonRpc.SupportsBatch != nil { + u.JsonRpc.SupportsBatch = defaults.JsonRpc.SupportsBatch + } + if u.JsonRpc.BatchMaxSize == 0 && defaults.JsonRpc.BatchMaxSize != 0 { + u.JsonRpc.BatchMaxSize = defaults.JsonRpc.BatchMaxSize + } + if u.JsonRpc.BatchMaxWait == 0 && defaults.JsonRpc.BatchMaxWait != 0 { + u.JsonRpc.BatchMaxWait = defaults.JsonRpc.BatchMaxWait + } + if u.JsonRpc.EnableGzip == nil && defaults.JsonRpc.EnableGzip != nil { + u.JsonRpc.EnableGzip = defaults.JsonRpc.EnableGzip + } + if u.JsonRpc.ProxyPool == "" && defaults.JsonRpc.ProxyPool != "" { + u.JsonRpc.ProxyPool = defaults.JsonRpc.ProxyPool + } + if u.JsonRpc.Headers == nil && defaults.JsonRpc.Headers != nil { + u.JsonRpc.Headers = defaults.JsonRpc.Headers + } + u.JsonRpc.HTTPClientTimeouts.MergeFrom(&defaults.JsonRpc.HTTPClientTimeouts) } // Integrity moved under Evm.Integrity if u.Evm != nil && defaults.Evm != nil { diff --git a/common/duration.go b/common/duration.go index 014645898..3cfdb3947 100644 --- a/common/duration.go +++ b/common/duration.go @@ -50,3 +50,12 @@ func (d Duration) String() string { func (d Duration) MarshalJSON() ([]byte, error) { return SonicCfg.Marshal(time.Duration(d).String()) } + +// WithDefault returns the duration value if it's positive, otherwise returns the default value. +// This simplifies the common pattern of applying defaults to optional duration configs. +func (d Duration) WithDefault(defaultVal time.Duration) time.Duration { + if d > 0 { + return time.Duration(d) + } + return defaultVal +} diff --git a/common/validation.go b/common/validation.go index b9ae2e782..6607abb50 100644 --- a/common/validation.go +++ b/common/validation.go @@ -243,6 +243,12 @@ func (p *ProxyPoolConfig) Validate() error { return fmt.Errorf("proxyPool.*.urls under proxyPool.*.id '%s' must be valid HTTP, HTTPS, or SOCKS5 URLs, got: %s", p.ID, url) } } + + // Validate HTTP client timeout settings + if err := p.HTTPClientTimeouts.Validate(fmt.Sprintf("proxyPool '%s': ", p.ID)); err != nil { + return err + } + return nil } @@ -1187,6 +1193,12 @@ func (j *JsonRpcUpstreamConfig) Validate(c *Config) error { return fmt.Errorf("jsonRpc.proxyPool '%s' does not exist in configured proxyPools, must be one of: %v", j.ProxyPool, allIds) } } + + // Validate HTTP client timeout settings + if err := j.HTTPClientTimeouts.Validate("jsonRpc: "); err != nil { + return err + } + return nil } diff --git a/erpc/config_analyzer.go b/erpc/config_analyzer.go index c0854052c..5df31d08c 100644 --- a/erpc/config_analyzer.go +++ b/erpc/config_analyzer.go @@ -325,7 +325,7 @@ func GenerateValidationReport(ctx context.Context, cfg *common.Config) *Validati } clReg := clients.NewClientRegistry(&silent, project.Id, prxPool, evm.NewJsonRpcErrorExtractor()) vndReg := thirdparty.NewVendorsRegistry() - rlr, err := upstream.NewRateLimitersRegistry(cfg.RateLimiters, &silent) + rlr, err := upstream.NewRateLimitersRegistry(ctx, cfg.RateLimiters, &silent) if err != nil { appendErr(fmt.Sprintf("project=%s failed to create rate limiters registry: %v", project.Id, err)) continue @@ -806,6 +806,7 @@ func validateUpstreamEndpoints(ctx context.Context, cfg *common.Config, logger z ) vndReg := thirdparty.NewVendorsRegistry() rlr, err := upstream.NewRateLimitersRegistry( + ctx, cfg.RateLimiters, &logger, ) diff --git a/erpc/erpc.go b/erpc/erpc.go index d9b29cb04..b28042bc7 100644 --- a/erpc/erpc.go +++ b/erpc/erpc.go @@ -33,6 +33,7 @@ func NewERPC( } rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + appCtx, cfg.RateLimiters, logger, ) diff --git a/erpc/evm_json_rpc_cache_test.go b/erpc/evm_json_rpc_cache_test.go index 21bed49e0..b49510168 100644 --- a/erpc/evm_json_rpc_cache_test.go +++ b/erpc/evm_json_rpc_cache_test.go @@ -99,7 +99,7 @@ func createCacheTestFixtures(ctx context.Context, upstreamConfigs []upsTestCfg) for _, cfg := range upstreamConfigs { mt := health.NewTracker(&logger, "prjA", 100*time.Second) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &logger) if err != nil { panic(err) } @@ -2622,7 +2622,7 @@ func createMockUpstream(t *testing.T, ctx context.Context, chainId int64, upstre require.NoError(t, err) mt := health.NewTracker(&logger, "prjA", 100*time.Second) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &logger) require.NoError(t, err) mockUpstream, err := upstream.NewUpstream(ctx, "test", &common.UpstreamConfig{ @@ -3244,7 +3244,7 @@ func createCacheTestFixturesWithCompression(ctx context.Context, upstreamConfigs for _, cfg := range upstreamConfigs { mt := health.NewTracker(&logger, "prjA", 100*time.Second) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &logger) if err != nil { panic(err) } diff --git a/erpc/networks_availability_test.go b/erpc/networks_availability_test.go index 0a076068d..32195116a 100644 --- a/erpc/networks_availability_test.go +++ b/erpc/networks_availability_test.go @@ -49,7 +49,7 @@ func TestNetworkAvailability_LowerExactBlock_Skip(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -114,7 +114,7 @@ func TestNetworkAvailability_LowerLatestMinus_Skip(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -187,7 +187,7 @@ func TestNetworkAvailability_LowerEarliestPlus_InitAndSkip(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -297,7 +297,7 @@ func TestNetworkAvailability_InvalidRange_FailOpen_AllowsRequest(t *testing.T) { Reply(200). JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x1"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -374,7 +374,7 @@ func TestNetworkAvailability_Window_ExactLowerUpper(t *testing.T) { Reply(200). JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x64"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -475,7 +475,7 @@ func TestNetworkAvailability_EarliestPlus_Freeze_NoAdvance(t *testing.T) { return strings.Contains(b, "\"eth_getBlockByNumber\"") && strings.Contains(b, "\"0x3\"") }).Reply(200).JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x3"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -564,7 +564,7 @@ func TestNetworkAvailability_EarliestPlus_UpdateRate_Advance(t *testing.T) { return strings.Contains(b, "\"eth_getBlockByNumber\"") && strings.Contains(b, "\"0x1\"") }).Reply(200).JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x1"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -637,7 +637,7 @@ func TestNetworkAvailability_UnsupportedProbe_FailOpen(t *testing.T) { return strings.Contains(b, "\"eth_getBlockByNumber\"") && strings.Contains(b, "\"0x0\"") }).Reply(200).JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x0"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -718,7 +718,7 @@ func TestNetworkAvailability_UpperEarliestPlus_Enforced(t *testing.T) { return strings.Contains(b, "\"eth_getBlockByNumber\"") && !strings.Contains(b, "\"0x0\"") && !strings.Contains(b, "\"0x1\"") }).Reply(200).JSON([]byte(`{"jsonrpc":"2.0","id":1,"result":{"number":"0x1"}}`)) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -803,7 +803,7 @@ func TestNetworkAvailability_Enforce_Precedence_DefaultDoesNotOverrideMethod(t * }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -882,7 +882,7 @@ func TestNetworkAvailability_Enforce_Precedence_DefaultDoesNotOverrideNetwork(t }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -953,7 +953,7 @@ func TestNetworkAvailability_Enforce_DefaultFalse_Disables_WhenNoExplicitConfig( }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1019,7 +1019,7 @@ func TestNetworkAvailability_Enforce_NetworkFalse_Disables(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1073,7 +1073,7 @@ func TestCheckUpstreamBlockAvailability_BlockBeyondLatest_ReturnsRetryableError( }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1140,7 +1140,7 @@ func TestCheckUpstreamBlockAvailability_SmallDistance_IsRetryable(t *testing.T) }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1231,7 +1231,7 @@ func TestCheckUpstreamBlockAvailability_ErrorHasCorrectDetails(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1312,7 +1312,7 @@ func TestRetryableBlockUnavailability_NoInfiniteLoop(t *testing.T) { }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) diff --git a/erpc/networks_bench_test.go b/erpc/networks_bench_test.go index 50ffed774..bfa7f097d 100644 --- a/erpc/networks_bench_test.go +++ b/erpc/networks_bench_test.go @@ -36,7 +36,7 @@ func BenchmarkNetworkForward_SimpleSuccess(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -141,7 +141,7 @@ func BenchmarkNetworkForward_MethodIgnoreCase(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -252,7 +252,7 @@ func BenchmarkNetworkForward_RetryFailures(b *testing.B) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -368,7 +368,7 @@ func BenchmarkNetworkForward_ConcurrentEthGetLogsIntegrityEnabled(b *testing.B) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { diff --git a/erpc/networks_bootstrap_test.go b/erpc/networks_bootstrap_test.go index da028759a..2414c9a2c 100644 --- a/erpc/networks_bootstrap_test.go +++ b/erpc/networks_bootstrap_test.go @@ -52,7 +52,7 @@ func TestNetworksBootstrap_SlowProviderUpstreams_InitializeThenServe(t *testing. }) require.NoError(t, err) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) upr := upstream.NewUpstreamsRegistry( @@ -100,7 +100,7 @@ func TestNetworksBootstrap_UnsupportedNetwork_FatalFast(t *testing.T) { ssr, err := data.NewSharedStateRegistry(ctx, &log.Logger, &common.SharedStateConfig{Connector: &common.ConnectorConfig{Driver: "memory", Memory: &common.MemoryConnectorConfig{MaxItems: 100_000, MaxTotalSize: "1GB"}}}) require.NoError(t, err) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) upr := upstream.NewUpstreamsRegistry(ctx, &log.Logger, "prjA", []*common.UpstreamConfig{}, ssr, rlr, vr, pr, nil, mt, 1*time.Second, nil) @@ -141,7 +141,7 @@ func TestNetworksBootstrap_ProviderInitializing_503Retry(t *testing.T) { ssr, err := data.NewSharedStateRegistry(ctx, &log.Logger, &common.SharedStateConfig{Connector: &common.ConnectorConfig{Driver: "memory", Memory: &common.MemoryConnectorConfig{MaxItems: 100_000, MaxTotalSize: "1GB"}}}) require.NoError(t, err) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) upr := upstream.NewUpstreamsRegistry(ctx, &log.Logger, "prjA", []*common.UpstreamConfig{}, ssr, rlr, vr, pr, nil, mt, 1*time.Second, nil) diff --git a/erpc/networks_earliest_detection_test.go b/erpc/networks_earliest_detection_test.go index 3bf32856c..9ae66faa9 100644 --- a/erpc/networks_earliest_detection_test.go +++ b/erpc/networks_earliest_detection_test.go @@ -61,7 +61,7 @@ func TestEarliestDetection_FailOpenWhenNoEarliestConfigured(t *testing.T) { "result": map[string]interface{}{"number": "0x5"}, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -174,7 +174,7 @@ func TestEarliestDetection_BlocksRequestAfterSuccessfulDetection(t *testing.T) { "result": nil, // Pruned }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -259,7 +259,7 @@ func TestEarliestDetection_InitialDetectionAlwaysRunsOnBootstrap(t *testing.T) { }, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -351,7 +351,7 @@ func TestEarliestDetection_SchedulerHandlesPeriodicUpdates(t *testing.T) { }, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -450,7 +450,7 @@ func TestEarliestDetection_InvalidRangeTriggersFailOpen(t *testing.T) { "result": nil, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -589,7 +589,7 @@ func TestEarliestDetection_StaleHighValueInSharedState(t *testing.T) { }, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) diff --git a/erpc/networks_failsafe_test.go b/erpc/networks_failsafe_test.go index b458f24ae..43adeb23c 100644 --- a/erpc/networks_failsafe_test.go +++ b/erpc/networks_failsafe_test.go @@ -725,7 +725,7 @@ func setupTestNetworkWithRetryConfig(t *testing.T, ctx context.Context, directiv }}, } - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) @@ -801,7 +801,7 @@ func setupTestNetworkWithMultipleFailsafePolicies(t *testing.T, ctx context.Cont Failsafe: failsafeConfigs, } - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) diff --git a/erpc/networks_forward_test.go b/erpc/networks_forward_test.go index fc6b6c8f1..da52acf41 100644 --- a/erpc/networks_forward_test.go +++ b/erpc/networks_forward_test.go @@ -44,7 +44,7 @@ func TestNetwork_Forward_InfiniteLoopWithAllUpstreamsSkipping(t *testing.T) { }, }, }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, logger) mt := health.NewTracker(logger, "testProject", 2*time.Second) @@ -132,7 +132,7 @@ func TestNetwork_Forward_InfiniteLoopWithAllUpstreamsSkipping(t *testing.T) { }) require.NoError(t, err) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, logger) require.NoError(t, err) mt := health.NewTracker(logger, "testProject", 2*time.Second) @@ -212,7 +212,7 @@ func TestNetwork_Forward_InfiniteLoopWithAllUpstreamsSkipping(t *testing.T) { require.NoError(t, err) // Setup rate limiters registry - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, logger) require.NoError(t, err) @@ -396,7 +396,7 @@ func TestNetwork_Forward_InfiniteLoopWithAllUpstreamsSkipping(t *testing.T) { }) require.NoError(t, err) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, logger) require.NoError(t, err) diff --git a/erpc/networks_hedge_test.go b/erpc/networks_hedge_test.go index d1ffb0445..db045a929 100644 --- a/erpc/networks_hedge_test.go +++ b/erpc/networks_hedge_test.go @@ -968,7 +968,7 @@ func setupTestNetworkWithMultipleUpstreams(t *testing.T, ctx context.Context, nu func setupTestNetwork(t *testing.T, ctx context.Context, upstreamConfigs []*common.UpstreamConfig, networkConfig *common.NetworkConfig) *Network { t.Helper() - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) diff --git a/erpc/networks_integrity_test.go b/erpc/networks_integrity_test.go index 7e43061e0..f9b0fce30 100644 --- a/erpc/networks_integrity_test.go +++ b/erpc/networks_integrity_test.go @@ -50,7 +50,7 @@ func mustHexToBytes(hex string) []byte { // Helper to setup test network for integrity tests func setupIntegrityTestNetwork(t *testing.T, ctx context.Context, upstreams []*common.UpstreamConfig, ntwCfg *common.NetworkConfig) (*Network, *upstream.UpstreamsRegistry) { - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) diff --git a/erpc/networks_interpolation_test.go b/erpc/networks_interpolation_test.go index 1e6575d48..d8c3152f7 100644 --- a/erpc/networks_interpolation_test.go +++ b/erpc/networks_interpolation_test.go @@ -39,7 +39,7 @@ func setupTestNetworkForInterpolation(t *testing.T, ctx context.Context, network }, } - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -1936,7 +1936,7 @@ func TestInterpolation_UpstreamSkipping_OnInterpolatedLatest(t *testing.T) { "result": "0x1234", }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) @@ -2054,7 +2054,7 @@ func TestInterpolation_UpstreamSkipping_DisabledByMethodConfig(t *testing.T) { "result": "0x99", }) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) diff --git a/erpc/networks_multiplexer_test.go b/erpc/networks_multiplexer_test.go index eb618f4b2..ee532be3d 100644 --- a/erpc/networks_multiplexer_test.go +++ b/erpc/networks_multiplexer_test.go @@ -387,7 +387,7 @@ func setupTestNetworkForMultiplexer(t *testing.T, ctx context.Context) *Network // No caching to test pure multiplexing } - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) diff --git a/erpc/networks_sendrawtx_test.go b/erpc/networks_sendrawtx_test.go index 1db09ba3c..ffbb258c0 100644 --- a/erpc/networks_sendrawtx_test.go +++ b/erpc/networks_sendrawtx_test.go @@ -1671,7 +1671,7 @@ func setupSendRawTxTestNetworkWithRetryAndHedge(t *testing.T, ctx context.Contex func setupSendRawTxNetwork(t *testing.T, ctx context.Context, upstreamConfigs []*common.UpstreamConfig, networkConfig *common.NetworkConfig) *Network { t.Helper() - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) diff --git a/erpc/networks_test.go b/erpc/networks_test.go index 117662520..c2c57c7d2 100644 --- a/erpc/networks_test.go +++ b/erpc/networks_test.go @@ -45,7 +45,7 @@ func TestNetwork_Forward(t *testing.T) { util.SetupMocksForEvmStatePoller() defer util.AssertNoPendingMocks(t, 0) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Store: &common.RateLimitStoreConfig{ Driver: "memory", @@ -212,7 +212,7 @@ func TestNetwork_Forward(t *testing.T) { EmptyResultMaxAttempts: 2, // cap empties at 2 total attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) if err != nil { t.Fatal(err) } @@ -331,7 +331,7 @@ func TestNetwork_Forward(t *testing.T) { fsCfg := &common.FailsafeConfig{ Retry: &common.RetryPolicyConfig{MaxAttempts: 3}, // no EmptyResultMaxAttempts set } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) if err != nil { t.Fatal(err) } @@ -447,7 +447,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) fsCfg := &common.FailsafeConfig{Retry: &common.RetryPolicyConfig{MaxAttempts: 3, EmptyResultMaxAttempts: 2}} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) if err != nil { t.Fatal(err) } @@ -541,7 +541,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) fsCfg := &common.FailsafeConfig{Retry: &common.RetryPolicyConfig{MaxAttempts: 5, EmptyResultMaxAttempts: 2}} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) if err != nil { t.Fatal(err) } @@ -627,7 +627,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) fsCfg := &common.FailsafeConfig{Retry: &common.RetryPolicyConfig{MaxAttempts: 5, EmptyResultMaxAttempts: 4, EmptyResultIgnore: []string{"eth_getBalance"}}} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{Budgets: []*common.RateLimitBudgetConfig{}}, &log.Logger) if err != nil { t.Fatal(err) } @@ -693,7 +693,7 @@ func TestNetwork_Forward(t *testing.T) { util.SetupMocksForEvmStatePoller() defer util.AssertNoPendingMocks(t, 0) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{ { @@ -829,7 +829,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -954,7 +954,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -1095,7 +1095,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -1261,7 +1261,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -1418,7 +1418,7 @@ func TestNetwork_Forward(t *testing.T) { // Initialize various components for the test environment clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -1615,7 +1615,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -1825,7 +1825,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -2017,7 +2017,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -2242,7 +2242,7 @@ func TestNetwork_Forward(t *testing.T) { // Set up the test environment clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, []*common.ProviderConfig{}, nil) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) @@ -2380,7 +2380,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -2588,7 +2588,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -2778,7 +2778,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -2961,7 +2961,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -3163,7 +3163,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 4, // Allow up to 4 attempts (1 initial + 3 retries) }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -3392,7 +3392,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -3583,7 +3583,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, // Allow up to 3 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -3771,7 +3771,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, // Allow up to 2 retry attempts }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -3952,7 +3952,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4081,7 +4081,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4219,7 +4219,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4391,7 +4391,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4521,7 +4521,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4623,7 +4623,7 @@ func TestNetwork_Forward(t *testing.T) { util.SetupMocksForEvmStatePoller() defer util.AssertNoPendingMocks(t, 0) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{ { @@ -4769,7 +4769,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 3, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -4899,7 +4899,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 4, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5027,7 +5027,7 @@ func TestNetwork_Forward(t *testing.T) { if err != nil { t.Fatal(err) } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5160,7 +5160,7 @@ func TestNetwork_Forward(t *testing.T) { Duration: common.Duration(1 * time.Second), }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5293,7 +5293,7 @@ func TestNetwork_Forward(t *testing.T) { MaxCount: 1, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5446,7 +5446,7 @@ func TestNetwork_Forward(t *testing.T) { MaxCount: 5, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5597,7 +5597,7 @@ func TestNetwork_Forward(t *testing.T) { MaxCount: 5, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5755,7 +5755,7 @@ func TestNetwork_Forward(t *testing.T) { }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -5889,7 +5889,7 @@ func TestNetwork_Forward(t *testing.T) { }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6022,7 +6022,7 @@ func TestNetwork_Forward(t *testing.T) { t.Fatal(err) } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6175,7 +6175,7 @@ func TestNetwork_Forward(t *testing.T) { } clr := clients.NewClientRegistry(&log.Logger, "prjA", nil, evm.NewJsonRpcErrorExtractor()) fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6313,7 +6313,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6458,7 +6458,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6586,7 +6586,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6738,7 +6738,7 @@ func TestNetwork_Forward(t *testing.T) { MaxAttempts: 2, }, } - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -6883,7 +6883,7 @@ func TestNetwork_Forward(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -7019,7 +7019,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -7142,7 +7142,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -7264,7 +7264,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -7380,7 +7380,7 @@ func TestNetwork_Forward(t *testing.T) { metricsTracker.Bootstrap(ctx) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &logger) assert.NoError(t, err) @@ -7566,7 +7566,7 @@ func TestNetwork_Forward(t *testing.T) { defer cancel() fsCfg := &common.FailsafeConfig{} - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { @@ -9236,7 +9236,7 @@ func TestNetwork_EvmGetLogs(t *testing.T) { defer cancel() // Build network with tight best-effort budgets to force fallback - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() pr, err := thirdparty.NewProvidersRegistry(&log.Logger, vr, []*common.ProviderConfig{}, nil) @@ -10170,7 +10170,7 @@ func TestNetwork_ThunderingHerdProtection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 5*time.Second) pollerInterval := 2000 * time.Millisecond @@ -10372,7 +10372,7 @@ func TestNetwork_ThunderingHerdProtection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) upCfg := &common.UpstreamConfig{ @@ -10557,7 +10557,7 @@ func TestNetwork_ThunderingHerdProtection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rlr, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rlr, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) mt := health.NewTracker(&log.Logger, "prjA", 2*time.Second) upCfg := &common.UpstreamConfig{ @@ -10655,7 +10655,7 @@ func TestNetwork_ThunderingHerdProtection(t *testing.T) { func setupTestNetworkSimple(t *testing.T, ctx context.Context, upstreamConfig *common.UpstreamConfig, networkConfig *common.NetworkConfig) *Network { t.Helper() - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) if upstreamConfig == nil { @@ -10764,7 +10764,7 @@ func setupTestNetworkWithFullAndArchiveNodeUpstreams( ) *Network { t.Helper() - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) up1 := &common.UpstreamConfig{ @@ -10929,7 +10929,7 @@ func TestNetwork_HighestLatestBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11082,7 +11082,7 @@ func TestNetwork_HighestLatestBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11240,7 +11240,7 @@ func TestNetwork_HighestLatestBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11367,7 +11367,7 @@ func TestNetwork_HighestLatestBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11501,7 +11501,7 @@ func TestNetwork_HighestFinalizedBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11632,7 +11632,7 @@ func TestNetwork_HighestFinalizedBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() @@ -11761,7 +11761,7 @@ func TestNetwork_HighestFinalizedBlockNumber(t *testing.T) { Reply(200). JSON([]byte(`{"result":"0x7b"}`)) - rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) vr := thirdparty.NewVendorsRegistry() diff --git a/erpc/policy_evaluator_test.go b/erpc/policy_evaluator_test.go index c6a95a881..599d32c22 100644 --- a/erpc/policy_evaluator_test.go +++ b/erpc/policy_evaluator_test.go @@ -1724,7 +1724,7 @@ func TestPolicyEvaluator(t *testing.T) { } func createTestNetwork(t *testing.T, ctx context.Context) (*Network, *upstream.Upstream, *upstream.Upstream, *upstream.Upstream) { - rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + rlr, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Budgets: []*common.RateLimitBudgetConfig{}, }, &log.Logger) if err != nil { diff --git a/erpc/projects_test.go b/erpc/projects_test.go index 48e966b84..d55297f90 100644 --- a/erpc/projects_test.go +++ b/erpc/projects_test.go @@ -25,7 +25,7 @@ func TestProject_Forward(t *testing.T) { util.SetupMocksForEvmStatePoller() defer util.AssertNoPendingMocks(t, 0) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{ Store: &common.RateLimitStoreConfig{ Driver: "memory", @@ -136,7 +136,7 @@ func TestProject_TimeoutScenarios(t *testing.T) { // Create a rate limiters registry (not specifically needed for this test, // but it's part of the usual setup.) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger, ) @@ -248,7 +248,7 @@ func TestProject_TimeoutScenarios(t *testing.T) { util.SetupMocksForEvmStatePoller() defer util.AssertNoPendingMocks(t, 0) - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger, ) @@ -416,7 +416,7 @@ func TestProject_LazyLoadNetworkDefaults(t *testing.T) { } // Build ProjectsRegistry with no existing EvmJsonRpcCache or RateLimiter - rateLimiters, _ := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimiters, _ := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) ssr, err := data.NewSharedStateRegistry(ctx, &log.Logger, &common.SharedStateConfig{ Connector: &common.ConnectorConfig{ Driver: "memory", @@ -512,7 +512,7 @@ func TestProject_NetworkAlias(t *testing.T) { panic(err) } - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry( + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger, ) diff --git a/erpc/upstream_selection_test.go b/erpc/upstream_selection_test.go index f4d717474..936b45437 100644 --- a/erpc/upstream_selection_test.go +++ b/erpc/upstream_selection_test.go @@ -809,7 +809,7 @@ func setupTestNetworkWithFourUpstreams(t *testing.T, ctx context.Context, failsa func setupTestNetworkWithConfig(t *testing.T, ctx context.Context, upstreamConfigs []*common.UpstreamConfig, failsafeConfig *common.FailsafeConfig) *Network { t.Helper() - rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{}, &log.Logger) + rateLimitersRegistry, err := upstream.NewRateLimitersRegistry(context.Background(), &common.RateLimiterConfig{}, &log.Logger) require.NoError(t, err) metricsTracker := health.NewTracker(&log.Logger, "test", time.Minute) diff --git a/upstream/ratelimiter_budget.go b/upstream/ratelimiter_budget.go index ae084225d..a82b16da1 100644 --- a/upstream/ratelimiter_budget.go +++ b/upstream/ratelimiter_budget.go @@ -24,7 +24,6 @@ type RateLimiterBudget struct { Rules []*RateLimitRule registry *RateLimitersRegistry rulesMu sync.RWMutex - cache limiter.RateLimitCache maxTimeout time.Duration } @@ -73,11 +72,17 @@ type ruleResult struct { allowed bool } +// getCache returns the current cache from the registry (thread-safe) +func (b *RateLimiterBudget) getCache() limiter.RateLimitCache { + return b.registry.GetCache() +} + // TryAcquirePermit evaluates all matching rules for the given method using Envoy's DoLimit. // Rules are evaluated in parallel for lower latency. Returns true if allowed, false if rate limited. func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId string, req *common.NormalizedRequest, method string, vendor string, upstreamId string, authLabel string, origin string) (bool, error) { - if b.cache == nil { - return true, nil + cache := b.getCache() + if cache == nil { + return true, nil // Fail-open when no cache is available } ctx, span := common.StartDetailSpan(ctx, "RateLimiter.TryAcquirePermit", @@ -168,6 +173,11 @@ func (b *RateLimiterBudget) TryAcquirePermit(ctx context.Context, projectId stri // evaluateRule checks a single rate limit rule against the cache. // Returns true if allowed, false if over limit. func (b *RateLimiterBudget) evaluateRule(ctx context.Context, rule *RateLimitRule, method, clientIP, userLabel, networkLabel string) bool { + cache := b.getCache() + if cache == nil { + return true // Fail-open when no cache is available + } + // Build descriptor entries entries := []*pb_struct.RateLimitDescriptor_Entry{{Key: "method", Value: method}} if rule.Config.PerIP && clientIP != "" && clientIP != "n/a" { @@ -205,9 +215,9 @@ func (b *RateLimiterBudget) evaluateRule(ctx context.Context, rule *RateLimitRul var statuses []*pb.RateLimitResponse_DescriptorStatus var timedOut bool if b.maxTimeout > 0 { - statuses, timedOut = b.doLimitWithTimeout(ctx, rlReq, limits, method, userLabel, networkLabel) + statuses, timedOut = b.doLimitWithTimeout(ctx, cache, rlReq, limits, method, userLabel, networkLabel) } else { - statuses = b.cache.DoLimit(ctx, rlReq, limits) + statuses = cache.DoLimit(ctx, rlReq, limits) } if timedOut { @@ -246,13 +256,14 @@ func (r *RateLimitRule) statsKeySuffix() string { // Returns (statuses, timedOut). On timeout, returns (nil, true) and records fail-open metric. func (b *RateLimiterBudget) doLimitWithTimeout( ctx context.Context, + cache limiter.RateLimitCache, rlReq *pb.RateLimitRequest, limits []*config.RateLimit, method, userLabel, networkLabel string, ) ([]*pb.RateLimitResponse_DescriptorStatus, bool) { resultCh := make(chan []*pb.RateLimitResponse_DescriptorStatus, 1) go func() { - resultCh <- b.cache.DoLimit(ctx, rlReq, limits) + resultCh <- cache.DoLimit(ctx, rlReq, limits) }() timer := time.NewTimer(b.maxTimeout) diff --git a/upstream/ratelimiter_budget_bench_test.go b/upstream/ratelimiter_budget_bench_test.go index 9093f4d25..a2198b258 100644 --- a/upstream/ratelimiter_budget_bench_test.go +++ b/upstream/ratelimiter_budget_bench_test.go @@ -67,12 +67,15 @@ func buildBenchBudget(numRules int, perUser, perIP, perNetwork bool) *RateLimite } logger := zerolog.Nop() + registry := &RateLimitersRegistry{ + statsManager: mgr, + envoyCache: cache, + } return &RateLimiterBudget{ logger: &logger, Id: "bench-budget", Rules: rules, - registry: &RateLimitersRegistry{statsManager: mgr}, - cache: cache, + registry: registry, } } @@ -172,7 +175,7 @@ func BenchmarkTryAcquirePermit_NoRulesMatch(b *testing.B) { // BenchmarkTryAcquirePermit_NilCache tests the nil cache fast path func BenchmarkTryAcquirePermit_NilCache(b *testing.B) { budget := buildBenchBudget(1, false, false, false) - budget.cache = nil + budget.registry.envoyCache = nil // Simulate Redis not connected yet ctx := context.Background() b.ReportAllocs() @@ -208,12 +211,15 @@ func buildBenchBudgetWithDelay(numRules int, delay time.Duration) *RateLimiterBu } logger := zerolog.Nop() + registry := &RateLimitersRegistry{ + statsManager: mgr, + envoyCache: &delayedCache{inner: innerCache, delay: delay}, + } return &RateLimiterBudget{ logger: &logger, Id: "bench-budget", Rules: rules, - registry: &RateLimitersRegistry{statsManager: mgr}, - cache: &delayedCache{inner: innerCache, delay: delay}, + registry: registry, } } diff --git a/upstream/ratelimiter_registry.go b/upstream/ratelimiter_registry.go index b5c6aea40..f17d8de71 100644 --- a/upstream/ratelimiter_registry.go +++ b/upstream/ratelimiter_registry.go @@ -1,7 +1,10 @@ package upstream import ( + "context" + "fmt" "math/rand" + "runtime/debug" "strings" "sync" "time" @@ -17,18 +20,23 @@ import ( "github.com/erpc/erpc/common" "github.com/erpc/erpc/telemetry" + "github.com/erpc/erpc/util" ) type RateLimitersRegistry struct { + appCtx context.Context logger *zerolog.Logger cfg *common.RateLimiterConfig budgetsLimiters sync.Map envoyCache limiter.RateLimitCache statsManager stats.Manager + cacheMu sync.RWMutex + initializer *util.Initializer } -func NewRateLimitersRegistry(cfg *common.RateLimiterConfig, logger *zerolog.Logger) (*RateLimitersRegistry, error) { +func NewRateLimitersRegistry(appCtx context.Context, cfg *common.RateLimiterConfig, logger *zerolog.Logger) (*RateLimitersRegistry, error) { r := &RateLimitersRegistry{ + appCtx: appCtx, cfg: cfg, logger: logger, } @@ -42,57 +50,109 @@ func (r *RateLimitersRegistry) bootstrap() error { return nil } + // Create a default stats manager (needed even if cache is nil) + store := gostats.NewStore(gostats.NewNullSink(), false) + r.statsManager = stats.NewStatManager(store, settings.NewSettings()) + // Initialize shared cache if configured if r.cfg.Store != nil && r.cfg.Store.Driver == "redis" && r.cfg.Store.Redis != nil { - store := gostats.NewStore(gostats.NewNullSink(), false) - mgr := stats.NewStatManager(store, settings.NewSettings()) - useTLS := r.cfg.Store.Redis.TLS != nil && r.cfg.Store.Redis.TLS.Enabled - url := r.cfg.Store.Redis.URI - if url == "" { - url = r.cfg.Store.Redis.Addr + // Create initializer for background retry + r.initializer = util.NewInitializer(r.appCtx, r.logger, nil) + + // Attempt Redis connection with panic recovery - don't block startup + connectTask := util.NewBootstrapTask("redis-ratelimiter-connect", r.connectRedisTask) + if err := r.initializer.ExecuteTasks(r.appCtx, connectTask); err != nil { + // Cache stays nil - rate limiting will fail-open until Redis connects + r.logger.Warn().Err(err).Msg("failed to initialize Redis rate limiter on first attempt (rate limiting will fail-open until connected, retrying in background)") } - poolSize := r.cfg.Store.Redis.ConnPoolSize - client := redis.NewClientImpl( - store.Scope("erpc_rl"), - useTLS, - r.cfg.Store.Redis.Username, - "tcp", - "single", - url, - poolSize, - 5*time.Millisecond, - 32, - nil, - false, - nil, - ) - r.envoyCache = redis.NewFixedRateLimitCacheImpl( - client, - nil, - utils.NewTimeSourceImpl(), - rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404 - 5, - nil, - defaultNearLimitRatio(r.cfg.Store.NearLimitRatio), - defaultCacheKeyPrefix(r.cfg.Store.CacheKeyPrefix), - mgr, - false, - ) - r.statsManager = mgr } else if r.cfg.Store != nil && r.cfg.Store.Driver == "memory" { - store := gostats.NewStore(gostats.NewNullSink(), false) - mgr := stats.NewStatManager(store, settings.NewSettings()) + // Explicitly configured for memory r.envoyCache = NewMemoryRateLimitCache( utils.NewTimeSourceImpl(), rand.New(rand.NewSource(time.Now().Unix())), // #nosec G404 0, defaultNearLimitRatio(r.cfg.Store.NearLimitRatio), defaultCacheKeyPrefix(r.cfg.Store.CacheKeyPrefix), - mgr, + r.statsManager, ) - r.statsManager = mgr } + // Initialize budgets (cache may be nil for Redis until it connects) + r.initializeBudgets() + + return nil +} + +// connectRedisTask attempts to connect to Redis with panic recovery +func (r *RateLimitersRegistry) connectRedisTask(ctx context.Context) (err error) { + // Recover from panics in the envoyproxy/ratelimit library + defer func() { + if rec := recover(); rec != nil { + telemetry.MetricUnexpectedPanicTotal.WithLabelValues( + "ratelimiter-redis-connect", + fmt.Sprintf("store:%s", r.cfg.Store.Redis.URI), + common.ErrorFingerprint(rec), + ).Inc() + r.logger.Error(). + Interface("panic", rec). + Str("stack", string(debug.Stack())). + Msg("panic recovered during Redis rate limiter connection (rate limiting will fail-open)") + err = fmt.Errorf("panic during Redis connection: %v", rec) + } + }() + + store := gostats.NewStore(gostats.NewNullSink(), false) + mgr := stats.NewStatManager(store, settings.NewSettings()) + useTLS := r.cfg.Store.Redis.TLS != nil && r.cfg.Store.Redis.TLS.Enabled + url := r.cfg.Store.Redis.URI + if url == "" { + url = r.cfg.Store.Redis.Addr + } + poolSize := r.cfg.Store.Redis.ConnPoolSize + + r.logger.Debug().Str("url", util.RedactEndpoint(url)).Bool("tls", useTLS).Int("poolSize", poolSize).Msg("attempting to connect to Redis for rate limiting") + + client := redis.NewClientImpl( + store.Scope("erpc_rl"), + useTLS, + r.cfg.Store.Redis.Username, + "tcp", + "single", + url, + poolSize, + 5*time.Millisecond, + 32, + nil, + false, + nil, + ) + + cache := redis.NewFixedRateLimitCacheImpl( + client, + nil, + utils.NewTimeSourceImpl(), + rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404 + 5, + nil, + defaultNearLimitRatio(r.cfg.Store.NearLimitRatio), + defaultCacheKeyPrefix(r.cfg.Store.CacheKeyPrefix), + mgr, + false, + ) + + // Successfully connected - update the cache + // Note: statsManager is NOT updated here to avoid data races. + // The statsManager created in bootstrap() is sufficient and identical. + r.cacheMu.Lock() + r.envoyCache = cache + r.cacheMu.Unlock() + + r.logger.Info().Str("url", util.RedactEndpoint(url)).Msg("successfully connected to Redis for rate limiting") + return nil +} + +// initializeBudgets creates the rate limiter budgets +func (r *RateLimitersRegistry) initializeBudgets() { for _, budgetCfg := range r.cfg.Budgets { lg := r.logger.With().Str("budget", budgetCfg.Id).Logger() lg.Debug().Msgf("initializing rate limiter budget") @@ -105,7 +165,6 @@ func (r *RateLimitersRegistry) bootstrap() error { Rules: make([]*RateLimitRule, 0), registry: r, logger: &lg, - cache: r.envoyCache, maxTimeout: maxTimeout, } @@ -131,8 +190,13 @@ func (r *RateLimitersRegistry) bootstrap() error { r.budgetsLimiters.Store(budgetCfg.Id, budget) } +} - return nil +// GetCache returns the current rate limit cache (thread-safe) +func (r *RateLimitersRegistry) GetCache() limiter.RateLimitCache { + r.cacheMu.RLock() + defer r.cacheMu.RUnlock() + return r.envoyCache } func (r *RateLimitersRegistry) GetBudget(budgetId string) (*RateLimiterBudget, error) { diff --git a/upstream/ratelimiter_test.go b/upstream/ratelimiter_test.go index 5fb758605..b59c74487 100644 --- a/upstream/ratelimiter_test.go +++ b/upstream/ratelimiter_test.go @@ -16,7 +16,7 @@ func TestRateLimitersRegistry_New(t *testing.T) { logger := zerolog.Nop() t.Run("nil config", func(t *testing.T) { - registry, err := NewRateLimitersRegistry(nil, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), nil, &logger) require.NoError(t, err) assert.NotNil(t, registry) }) @@ -37,7 +37,7 @@ func TestRateLimitersRegistry_New(t *testing.T) { }, }, } - registry, err := NewRateLimitersRegistry(cfg, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), cfg, &logger) require.NoError(t, err) assert.NotNil(t, registry) }) @@ -60,7 +60,7 @@ func TestRateLimitersRegistry_GetBudget(t *testing.T) { }, }, } - registry, err := NewRateLimitersRegistry(cfg, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), cfg, &logger) require.NoError(t, err) t.Run("existing budget", func(t *testing.T) { @@ -106,7 +106,7 @@ func TestRateLimiterBudget_GetRulesByMethod(t *testing.T) { }, }, } - registry, err := NewRateLimitersRegistry(cfg, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), cfg, &logger) require.NoError(t, err) budget, err := registry.GetBudget("test-budget") @@ -152,7 +152,7 @@ func TestRateLimiter_ConcurrentPermits(t *testing.T) { }, }, } - registry, err := NewRateLimitersRegistry(cfg, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), cfg, &logger) require.NoError(t, err) budget, err := registry.GetBudget("test-budget") @@ -203,7 +203,7 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) { }, } - registry, err := NewRateLimitersRegistry(cfg, &logger) + registry, err := NewRateLimitersRegistry(context.Background(), cfg, &logger) require.NoError(t, err) budget, err := registry.GetBudget("test-budget") diff --git a/upstream/registry_contention_bench_test.go b/upstream/registry_contention_bench_test.go index 6dd5bcc72..bccac6e60 100644 --- a/upstream/registry_contention_bench_test.go +++ b/upstream/registry_contention_bench_test.go @@ -25,7 +25,7 @@ func buildRegistryForBench(b *testing.B, numNetworks, upstreamsPerNetwork int, m vr := thirdparty.NewVendorsRegistry() pr, _ := thirdparty.NewProvidersRegistry(&log.Logger, vr, nil, nil) - rlr, _ := NewRateLimitersRegistry(nil, &log.Logger) + rlr, _ := NewRateLimitersRegistry(context.Background(), nil, &log.Logger) mt := health.NewTracker(&log.Logger, "bench-prj", time.Minute) reg := NewUpstreamsRegistry(