From 47c0cae65ce37e3da53a2edc4177e4dac30813f2 Mon Sep 17 00:00:00 2001 From: dthuynh Date: Wed, 27 May 2026 14:05:49 +0700 Subject: [PATCH 1/2] redis: bound cluster pipeline parallelism Signed-off-by: dthuynh --- README.md | 1 + go.mod | 1 + go.sum | 2 + src/redis/cache_impl.go | 12 ++- src/redis/driver_impl.go | 110 ++++++++++++++++------ src/redis/driver_impl_test.go | 169 ++++++++++++++++++++++++++++++++++ src/settings/settings.go | 26 ++++-- src/settings/settings_test.go | 31 +++++++ 8 files changed, 313 insertions(+), 39 deletions(-) create mode 100644 src/redis/driver_impl_test.go diff --git a/README.md b/README.md index 6cf2e51c..b5c24334 100644 --- a/README.md +++ b/README.md @@ -1341,6 +1341,7 @@ For high throughput scenarios, ratelimit supports write buffering via [radix v4' 1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.** 1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above. +1. `REDIS_CLUSTER_PIPELINE_PARALLELISM` & `REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM`: controls per-key pipeline group concurrency in Redis Cluster mode. Default: `1`, which preserves the legacy serial behavior. Set to `0` for unbounded parallelism, or a value greater than `1` to bound concurrent Redis group calls per pipeline execution. Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns. diff --git a/go.mod b/go.mod index 4a05be8d..2653b82b 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 golang.org/x/net v0.52.0 + golang.org/x/sync v0.20.0 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 55358ed7..dabb6e1f 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 266c0a4d..f7b7072c 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -18,15 +18,19 @@ func NewRateLimiterCacheImplFromSettings(ctx context.Context, s settings.Setting closer := &utils.MultiCloser{} var perSecondPool Client if s.RedisPerSecond { - perSecondPool = NewClientImpl(ctx, srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, + perSecondPool = newClientImpl(ctx, srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout, - s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth, s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime) + s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth, + s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime, + s.RedisPerSecondClusterPipelineParallelism) closer.Closers = append(closer.Closers, perSecondPool) } - otherPool := NewClientImpl(ctx, srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, + otherPool := newClientImpl(ctx, srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout, - s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth, s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime) + s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth, + s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime, + s.RedisClusterPipelineParallelism) closer.Closers = append(closer.Closers, otherPool) return NewFixedRateLimitCacheImpl( diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index f65269eb..f92d245a 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -13,6 +13,7 @@ import ( "github.com/mediocregopher/radix/v4" "github.com/mediocregopher/radix/v4/trace" logger "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "github.com/envoyproxy/ratelimit/src/server" "github.com/envoyproxy/ratelimit/src/utils" @@ -69,9 +70,10 @@ type redisClient interface { } type clientImpl struct { - client redisClient - stats poolStats - isCluster bool + client redisClient + stats poolStats + isCluster bool + clusterPipelineParallelism int } func checkError(err error) { @@ -124,6 +126,18 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string, startupInitialInterval, startupMaxInterval, startupMaxElapsedTime time.Duration, +) Client { + return newClientImpl(ctx, scope, useTls, auth, redisSocketType, redisType, url, poolSize, + pipelineWindow, pipelineLimit, tlsConfig, healthCheckActiveConnection, srv, + timeout, poolOnEmptyBehavior, sentinelAuth, + startupInitialInterval, startupMaxInterval, startupMaxElapsedTime, 1) +} + +func newClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, + pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server, + timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string, + startupInitialInterval, startupMaxInterval, startupMaxElapsedTime time.Duration, + clusterPipelineParallelism int, ) Client { maskedUrl := utils.MaskCredentialsInUrl(url) logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize) @@ -155,6 +169,18 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re poolConfig.Dialer.WriteFlushInterval = pipelineWindow logger.Debugf("Cluster mode: setting WriteFlushInterval to %v", pipelineWindow) } + if isCluster { + switch { + case clusterPipelineParallelism < 0: + panic(RedisError("REDIS_CLUSTER_PIPELINE_PARALLELISM must be >= 0")) + case clusterPipelineParallelism == 0: + logger.Warnf("Redis cluster pipeline parallelism: unbounded") + case clusterPipelineParallelism == 1: + logger.Warnf("Redis cluster pipeline parallelism: disabled (serial legacy behavior)") + default: + logger.Warnf("Redis cluster pipeline parallelism: bounded to %d concurrent groups", clusterPipelineParallelism) + } + } // IMPORTANT: radix v4 pool behavior changes from v3 // @@ -278,9 +304,10 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re } return &clientImpl{ - client: client, - stats: stats, - isCluster: isCluster, + client: client, + stats: stats, + isCluster: isCluster, + clusterPipelineParallelism: clusterPipelineParallelism, } } @@ -329,13 +356,25 @@ func (c *clientImpl) PipeDo(pipeline Pipeline) error { return c.client.Do(ctx, p) } -// executeGroupedPipeline groups pipeline actions by key and executes each group -// as a separate pipeline. This allows same-key commands (like INCRBY + EXPIRE) -// to be pipelined together even in cluster mode. +// executeGroupedPipeline routes a pipeline of Redis actions in cluster mode +// via a three-tier dispatch: +// +// 1. Single-action fast path (len==1): skip grouping entirely. +// 2. Serial compatibility path: clusterPipelineParallelism == 1 preserves +// the pre-parallelization behavior. +// 3. General path: group actions by key (same-key commands like INCRBY + +// EXPIRE are still pipelined together) and execute groups concurrently +// via errgroup. clusterPipelineParallelism == 0 is unbounded; values >1 +// bound the number of concurrent groups. func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeline) error { - // Group actions by key, preserving first-occurrence order - var groups [][]radix.Action - keyToIndex := make(map[string]int) + // Tier 1: single action — skip grouping, skip map alloc. + if len(pipeline) == 1 { + return c.client.Do(ctx, pipeline[0].Action) + } + + // Tier 2: group by key, preserving first-occurrence order. + groups := make([][]radix.Action, 0, len(pipeline)) + keyToIndex := make(map[string]int, len(pipeline)) for _, pa := range pipeline { if idx, exists := keyToIndex[pa.Key]; exists { @@ -346,23 +385,40 @@ func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeli } } - // Execute each group + if c.clusterPipelineParallelism == 1 { + return c.doPipelineGroupsSerial(ctx, groups) + } + + // Execute groups concurrently. + eg, egCtx := errgroup.WithContext(ctx) + if c.clusterPipelineParallelism > 1 { + eg.SetLimit(c.clusterPipelineParallelism) + } for _, actions := range groups { - if len(actions) == 1 { - if err := c.client.Do(ctx, actions[0]); err != nil { - return err - } - } else { - // Multiple commands for same key: pipeline them together - p := radix.NewPipeline() - for _, action := range actions { - p.Append(action) - } - if err := c.client.Do(ctx, p); err != nil { - return err - } - } + actions := actions + eg.Go(func() error { + return c.doPipelineGroup(egCtx, actions) + }) } + return eg.Wait() +} +func (c *clientImpl) doPipelineGroupsSerial(ctx context.Context, groups [][]radix.Action) error { + for _, actions := range groups { + if err := c.doPipelineGroup(ctx, actions); err != nil { + return err + } + } return nil } + +func (c *clientImpl) doPipelineGroup(ctx context.Context, actions []radix.Action) error { + if len(actions) == 1 { + return c.client.Do(ctx, actions[0]) + } + p := radix.NewPipeline() + for _, action := range actions { + p.Append(action) + } + return c.client.Do(ctx, p) +} diff --git a/src/redis/driver_impl_test.go b/src/redis/driver_impl_test.go new file mode 100644 index 00000000..dfe78785 --- /dev/null +++ b/src/redis/driver_impl_test.go @@ -0,0 +1,169 @@ +package redis + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/mediocregopher/radix/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testAction struct { + key string + delay time.Duration + err error +} + +func (a *testAction) Properties() radix.ActionProperties { + return radix.ActionProperties{ + Keys: []string{a.key}, + CanRetry: true, + CanPipeline: true, + } +} + +func (a *testAction) Perform(ctx context.Context, _ radix.Conn) error { + if a.delay > 0 { + timer := time.NewTimer(a.delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + } + return a.err +} + +type recordingRedisClient struct { + mu sync.Mutex + calls []radix.Action + inFlight int + maxInFlight int +} + +func (c *recordingRedisClient) Do(ctx context.Context, action radix.Action) error { + c.mu.Lock() + c.calls = append(c.calls, action) + c.inFlight++ + if c.inFlight > c.maxInFlight { + c.maxInFlight = c.inFlight + } + c.mu.Unlock() + + defer func() { + c.mu.Lock() + c.inFlight-- + c.mu.Unlock() + }() + + if action, ok := action.(*testAction); ok { + return action.Perform(ctx, nil) + } + return nil +} + +func (c *recordingRedisClient) Close() error { + return nil +} + +func (c *recordingRedisClient) callCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.calls) +} + +func (c *recordingRedisClient) maxConcurrentCalls() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.maxInFlight +} + +func TestExecuteGroupedPipelineSingleActionFastPath(t *testing.T) { + fakeClient := &recordingRedisClient{} + client := &clientImpl{ + client: fakeClient, + clusterPipelineParallelism: 1, + } + + err := client.executeGroupedPipeline(context.Background(), Pipeline{ + {Key: "a", Action: &testAction{key: "a"}}, + }) + + require.NoError(t, err) + assert.Equal(t, 1, fakeClient.callCount()) +} + +func TestExecuteGroupedPipelineSerialCompatibilityStopsOnFirstError(t *testing.T) { + expectedErr := errors.New("redis failed") + fakeClient := &recordingRedisClient{} + client := &clientImpl{ + client: fakeClient, + clusterPipelineParallelism: 1, + } + + err := client.executeGroupedPipeline(context.Background(), Pipeline{ + {Key: "a", Action: &testAction{key: "a", err: expectedErr}}, + {Key: "b", Action: &testAction{key: "b"}}, + }) + + require.ErrorIs(t, err, expectedErr) + assert.Equal(t, 1, fakeClient.callCount()) + assert.Equal(t, 1, fakeClient.maxConcurrentCalls()) +} + +func TestExecuteGroupedPipelineGroupsSameKeyActions(t *testing.T) { + fakeClient := &recordingRedisClient{} + client := &clientImpl{ + client: fakeClient, + clusterPipelineParallelism: 0, + } + + err := client.executeGroupedPipeline(context.Background(), Pipeline{ + {Key: "a", Action: &testAction{key: "a"}}, + {Key: "a", Action: &testAction{key: "a"}}, + }) + + require.NoError(t, err) + assert.Equal(t, 1, fakeClient.callCount()) +} + +func TestExecuteGroupedPipelineUnboundedParallelism(t *testing.T) { + fakeClient := &recordingRedisClient{} + client := &clientImpl{ + client: fakeClient, + clusterPipelineParallelism: 0, + } + + err := client.executeGroupedPipeline(context.Background(), Pipeline{ + {Key: "a", Action: &testAction{key: "a", delay: 100 * time.Millisecond}}, + {Key: "b", Action: &testAction{key: "b", delay: 100 * time.Millisecond}}, + {Key: "c", Action: &testAction{key: "c", delay: 100 * time.Millisecond}}, + }) + + require.NoError(t, err) + assert.Equal(t, 3, fakeClient.callCount()) + assert.Equal(t, 3, fakeClient.maxConcurrentCalls()) +} + +func TestExecuteGroupedPipelineBoundedParallelism(t *testing.T) { + fakeClient := &recordingRedisClient{} + client := &clientImpl{ + client: fakeClient, + clusterPipelineParallelism: 2, + } + + err := client.executeGroupedPipeline(context.Background(), Pipeline{ + {Key: "a", Action: &testAction{key: "a", delay: 100 * time.Millisecond}}, + {Key: "b", Action: &testAction{key: "b", delay: 100 * time.Millisecond}}, + {Key: "c", Action: &testAction{key: "c", delay: 100 * time.Millisecond}}, + }) + + require.NoError(t, err) + assert.Equal(t, 3, fakeClient.callCount()) + assert.Equal(t, 2, fakeClient.maxConcurrentCalls()) +} diff --git a/src/settings/settings.go b/src/settings/settings.go index 83d65776..e3c347b7 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -149,14 +149,20 @@ type Settings struct { // RedisPipelineLimit is DEPRECATED and unused in radix v4. // This setting has no effect. Radix v4 does not support explicit pipeline size limits. // Write buffering is controlled solely by RedisPipelineWindow (WriteFlushInterval). - RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` - RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` - RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` - RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` - RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` - RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"` - RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""` - RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` + // RedisClusterPipelineParallelism controls how many per-key pipeline groups + // can be executed concurrently in Redis Cluster mode. + // - 0: unbounded parallelism + // - 1: serial legacy behavior (default) + // - >1: bounded parallelism + RedisClusterPipelineParallelism int `envconfig:"REDIS_CLUSTER_PIPELINE_PARALLELISM" default:"1"` + RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` + RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` + RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"` + RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` + RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"` + RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""` + RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"` // RedisSentinelAuth is the password for authenticating to Redis Sentinel nodes (not the Redis master/replica). // This is separate from RedisAuth which is used for authenticating to the Redis master/replica nodes. // If empty, no authentication will be attempted when connecting to Sentinel nodes. @@ -171,6 +177,10 @@ type Settings struct { // RedisPerSecondPipelineLimit is DEPRECATED and unused in radix v4. // See comments of RedisPipelineLimit for details. RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` + // RedisPerSecondClusterPipelineParallelism controls per-key pipeline group + // concurrency for per-second Redis Cluster mode. + // See comments of RedisClusterPipelineParallelism for details. + RedisPerSecondClusterPipelineParallelism int `envconfig:"REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM" default:"1"` // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"` // RedisTimeout sets the timeout for Redis connection and I/O operations. diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index f15fbea2..76aa0ffd 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -97,6 +97,37 @@ func TestRedisPerSecondPoolOnEmptyBehavior_Error(t *testing.T) { assert.Equal(t, "ERROR", settings.RedisPerSecondPoolOnEmptyBehavior) } +func TestRedisClusterPipelineParallelism_Default(t *testing.T) { + os.Unsetenv("REDIS_CLUSTER_PIPELINE_PARALLELISM") + os.Unsetenv("REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM") + + settings := NewSettings() + + assert.Equal(t, 1, settings.RedisClusterPipelineParallelism) + assert.Equal(t, 1, settings.RedisPerSecondClusterPipelineParallelism) +} + +func TestRedisClusterPipelineParallelism_Configured(t *testing.T) { + os.Setenv("REDIS_CLUSTER_PIPELINE_PARALLELISM", "8") + os.Setenv("REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM", "4") + defer os.Unsetenv("REDIS_CLUSTER_PIPELINE_PARALLELISM") + defer os.Unsetenv("REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM") + + settings := NewSettings() + + assert.Equal(t, 8, settings.RedisClusterPipelineParallelism) + assert.Equal(t, 4, settings.RedisPerSecondClusterPipelineParallelism) +} + +func TestRedisClusterPipelineParallelism_Unbounded(t *testing.T) { + os.Setenv("REDIS_CLUSTER_PIPELINE_PARALLELISM", "0") + defer os.Unsetenv("REDIS_CLUSTER_PIPELINE_PARALLELISM") + + settings := NewSettings() + + assert.Equal(t, 0, settings.RedisClusterPipelineParallelism) +} + // Test both pools can be configured independently func TestRedisPoolOnEmptyBehavior_IndependentConfiguration(t *testing.T) { os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR") From 99caa2288525b59306a7d5b87add6a846bbd4d58 Mon Sep 17 00:00:00 2001 From: dthuynh Date: Fri, 5 Jun 2026 10:24:09 +0700 Subject: [PATCH 2/2] Refactor to address comment: use gRPC request context in PipeDo, cap the parallelism to RedisPoolSize Signed-off-by: dthuynh --- README.md | 2 +- src/redis/driver.go | 9 ++++- src/redis/driver_impl.go | 43 ++++++++++++++++----- src/redis/driver_impl_test.go | 58 +++++++++++++++++++++++++++-- src/redis/fixed_cache_impl.go | 8 ++-- src/settings/settings.go | 4 +- src/settings/settings_test.go | 2 +- test/mocks/redis/redis.go | 9 +++-- test/redis/driver_impl_test.go | 10 ++--- test/redis/fixed_cache_impl_test.go | 46 +++++++++++------------ 10 files changed, 136 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index b5c24334..f2e8d6df 100644 --- a/README.md +++ b/README.md @@ -1341,7 +1341,7 @@ For high throughput scenarios, ratelimit supports write buffering via [radix v4' 1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.** 1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above. -1. `REDIS_CLUSTER_PIPELINE_PARALLELISM` & `REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM`: controls per-key pipeline group concurrency in Redis Cluster mode. Default: `1`, which preserves the legacy serial behavior. Set to `0` for unbounded parallelism, or a value greater than `1` to bound concurrent Redis group calls per pipeline execution. +1. `REDIS_CLUSTER_PIPELINE_PARALLELISM` & `REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM`: controls per-key pipeline group concurrency in Redis Cluster mode. Default: `1`, which preserves the legacy serial behavior. Set to `0` for auto parallelism bounded by the corresponding Redis pool size, or a value greater than `1` to bound concurrent Redis group calls per pipeline execution. Configured values greater than the corresponding Redis pool size are capped to the pool size. Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns. diff --git a/src/redis/driver.go b/src/redis/driver.go index 8f52df42..4c6bd4f8 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -1,6 +1,10 @@ package redis -import "github.com/mediocregopher/radix/v4" +import ( + "context" + + "github.com/mediocregopher/radix/v4" +) // Errors that may be raised during config parsing. type RedisError string @@ -32,8 +36,9 @@ type Client interface { // a single write, then reads their responses in a single read. This reduces // network delay into a single round-trip. // + // @param ctx supplies the context for Redis I/O. // @param pipeline supplies the queue for pending commands. - PipeDo(pipeline Pipeline) error + PipeDo(ctx context.Context, pipeline Pipeline) error // Once Close() is called all future method calls on the Client will return // an error diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index f92d245a..6b93d02a 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -82,6 +82,27 @@ func checkError(err error) { } } +func effectiveClusterPipelineParallelism(configuredParallelism, poolSize int) int { + if configuredParallelism < 0 { + panic(RedisError("redis cluster pipeline parallelism must be >= 0")) + } + + if configuredParallelism == 1 { + return 1 + } + + poolCeiling := poolSize + if poolCeiling < 1 { + poolCeiling = 1 + } + + if configuredParallelism == 0 || configuredParallelism > poolCeiling { + return poolCeiling + } + + return configuredParallelism +} + // createDialer creates a radix.Dialer with timeout, TLS, and auth configuration // targetName is used for logging to identify the connection target (e.g., URL, "sentinel(url)") func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, auth string, targetName string) radix.Dialer { @@ -169,16 +190,19 @@ func newClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re poolConfig.Dialer.WriteFlushInterval = pipelineWindow logger.Debugf("Cluster mode: setting WriteFlushInterval to %v", pipelineWindow) } + + effectivePipelineParallelism := clusterPipelineParallelism if isCluster { + effectivePipelineParallelism = effectiveClusterPipelineParallelism(clusterPipelineParallelism, poolSize) switch { - case clusterPipelineParallelism < 0: - panic(RedisError("REDIS_CLUSTER_PIPELINE_PARALLELISM must be >= 0")) case clusterPipelineParallelism == 0: - logger.Warnf("Redis cluster pipeline parallelism: unbounded") - case clusterPipelineParallelism == 1: + logger.Warnf("Redis cluster pipeline parallelism: auto bounded to Redis pool size (%d concurrent groups)", effectivePipelineParallelism) + case clusterPipelineParallelism != effectivePipelineParallelism: + logger.Warnf("Redis cluster pipeline parallelism: configured value %d exceeds Redis pool size %d; bounded to %d concurrent groups", clusterPipelineParallelism, poolSize, effectivePipelineParallelism) + case effectivePipelineParallelism == 1: logger.Warnf("Redis cluster pipeline parallelism: disabled (serial legacy behavior)") default: - logger.Warnf("Redis cluster pipeline parallelism: bounded to %d concurrent groups", clusterPipelineParallelism) + logger.Warnf("Redis cluster pipeline parallelism: bounded to %d concurrent groups", effectivePipelineParallelism) } } @@ -307,7 +331,7 @@ func newClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re client: client, stats: stats, isCluster: isCluster, - clusterPipelineParallelism: clusterPipelineParallelism, + clusterPipelineParallelism: effectivePipelineParallelism, } } @@ -339,8 +363,7 @@ func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key str }) } -func (c *clientImpl) PipeDo(pipeline Pipeline) error { - ctx := context.Background() +func (c *clientImpl) PipeDo(ctx context.Context, pipeline Pipeline) error { if c.isCluster { // Cluster mode: group commands by key and execute each group as a pipeline. // This ensures INCRBY + EXPIRE for the same key are pipelined together (same slot), @@ -364,8 +387,8 @@ func (c *clientImpl) PipeDo(pipeline Pipeline) error { // the pre-parallelization behavior. // 3. General path: group actions by key (same-key commands like INCRBY + // EXPIRE are still pipelined together) and execute groups concurrently -// via errgroup. clusterPipelineParallelism == 0 is unbounded; values >1 -// bound the number of concurrent groups. +// via errgroup with clusterPipelineParallelism as the max concurrent group +// count. func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeline) error { // Tier 1: single action — skip grouping, skip map alloc. if len(pipeline) == 1 { diff --git a/src/redis/driver_impl_test.go b/src/redis/driver_impl_test.go index dfe78785..57a6864c 100644 --- a/src/redis/driver_impl_test.go +++ b/src/redis/driver_impl_test.go @@ -83,6 +83,58 @@ func (c *recordingRedisClient) maxConcurrentCalls() int { return c.maxInFlight } +func TestEffectiveClusterPipelineParallelism(t *testing.T) { + tests := []struct { + name string + configuredParallelism int + poolSize int + want int + }{ + { + name: "serial legacy behavior", + configuredParallelism: 1, + poolSize: 10, + want: 1, + }, + { + name: "auto uses pool size", + configuredParallelism: 0, + poolSize: 10, + want: 10, + }, + { + name: "bounded below pool size", + configuredParallelism: 8, + poolSize: 10, + want: 8, + }, + { + name: "configured value is capped to pool size", + configuredParallelism: 20, + poolSize: 10, + want: 10, + }, + { + name: "pool ceiling never produces zero", + configuredParallelism: 0, + poolSize: 0, + want: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, effectiveClusterPipelineParallelism(tt.configuredParallelism, tt.poolSize)) + }) + } +} + +func TestEffectiveClusterPipelineParallelismRejectsNegativeConfig(t *testing.T) { + assert.Panics(t, func() { + effectiveClusterPipelineParallelism(-1, 10) + }) +} + func TestExecuteGroupedPipelineSingleActionFastPath(t *testing.T) { fakeClient := &recordingRedisClient{} client := &clientImpl{ @@ -120,7 +172,7 @@ func TestExecuteGroupedPipelineGroupsSameKeyActions(t *testing.T) { fakeClient := &recordingRedisClient{} client := &clientImpl{ client: fakeClient, - clusterPipelineParallelism: 0, + clusterPipelineParallelism: 2, } err := client.executeGroupedPipeline(context.Background(), Pipeline{ @@ -132,11 +184,11 @@ func TestExecuteGroupedPipelineGroupsSameKeyActions(t *testing.T) { assert.Equal(t, 1, fakeClient.callCount()) } -func TestExecuteGroupedPipelineUnboundedParallelism(t *testing.T) { +func TestExecuteGroupedPipelineParallelismAllowsConcurrentGroups(t *testing.T) { fakeClient := &recordingRedisClient{} client := &clientImpl{ client: fakeClient, - clusterPipelineParallelism: 0, + clusterPipelineParallelism: 3, } err := client.executeGroupedPipeline(context.Background(), Pipeline{ diff --git a/src/redis/fixed_cache_impl.go b/src/redis/fixed_cache_impl.go index 1034d3be..0bbaae7e 100644 --- a/src/redis/fixed_cache_impl.go +++ b/src/redis/fixed_cache_impl.go @@ -134,10 +134,10 @@ func (this *fixedRateLimitCacheImpl) DoLimit( } if pipelineToGet != nil { - checkError(this.client.PipeDo(pipelineToGet)) + checkError(this.client.PipeDo(ctx, pipelineToGet)) } if perSecondPipelineToGet != nil { - checkError(this.perSecondClient.PipeDo(perSecondPipelineToGet)) + checkError(this.perSecondClient.PipeDo(ctx, perSecondPipelineToGet)) } for i, cacheKey := range cacheKeys { @@ -196,10 +196,10 @@ func (this *fixedRateLimitCacheImpl) DoLimit( defer span.End() if pipeline != nil { - checkError(this.client.PipeDo(pipeline)) + checkError(this.client.PipeDo(ctx, pipeline)) } if perSecondPipeline != nil { - checkError(this.perSecondClient.PipeDo(perSecondPipeline)) + checkError(this.perSecondClient.PipeDo(ctx, perSecondPipeline)) } // Now fetch the pipeline. diff --git a/src/settings/settings.go b/src/settings/settings.go index e3c347b7..2129cf80 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -152,9 +152,9 @@ type Settings struct { RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` // RedisClusterPipelineParallelism controls how many per-key pipeline groups // can be executed concurrently in Redis Cluster mode. - // - 0: unbounded parallelism + // - 0: auto, bounded to REDIS_POOL_SIZE // - 1: serial legacy behavior (default) - // - >1: bounded parallelism + // - >1: bounded parallelism, capped to REDIS_POOL_SIZE RedisClusterPipelineParallelism int `envconfig:"REDIS_CLUSTER_PIPELINE_PARALLELISM" default:"1"` RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index 76aa0ffd..c6ab2fd0 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -119,7 +119,7 @@ func TestRedisClusterPipelineParallelism_Configured(t *testing.T) { assert.Equal(t, 4, settings.RedisPerSecondClusterPipelineParallelism) } -func TestRedisClusterPipelineParallelism_Unbounded(t *testing.T) { +func TestRedisClusterPipelineParallelism_Auto(t *testing.T) { os.Setenv("REDIS_CLUSTER_PIPELINE_PARALLELISM", "0") defer os.Unsetenv("REDIS_CLUSTER_PIPELINE_PARALLELISM") diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 192b41c2..65410f2e 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -5,6 +5,7 @@ package mock_redis import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -102,15 +103,15 @@ func (mr *MockClientMockRecorder) PipeAppend(arg0, arg1, arg2, arg3 interface{}, } // PipeDo mocks base method -func (m *MockClient) PipeDo(arg0 redis.Pipeline) error { +func (m *MockClient) PipeDo(arg0 context.Context, arg1 redis.Pipeline) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PipeDo", arg0) + ret := m.ctrl.Call(m, "PipeDo", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // PipeDo indicates an expected call of PipeDo -func (mr *MockClientMockRecorder) PipeDo(arg0 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) PipeDo(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PipeDo", reflect.TypeOf((*MockClient)(nil).PipeDo), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PipeDo", reflect.TypeOf((*MockClient)(nil).PipeDo), arg0, arg1) } diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index 323c51e3..ecfa5f69 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -178,7 +178,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f pipeline = client.PipeAppend(pipeline, nil, "SET", "foo", "bar") pipeline = client.PipeAppend(pipeline, &res, "GET", "foo") - assert.Nil(t, client.PipeDo(pipeline)) + assert.Nil(t, client.PipeDo(context.Background(), pipeline)) assert.Equal(t, "bar", res) }) @@ -190,10 +190,10 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f var res uint32 hits := uint32(1) - assert.Nil(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) + assert.Nil(t, client.PipeDo(context.Background(), client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) assert.Equal(t, hits, res) - assert.Nil(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) + assert.Nil(t, client.PipeDo(context.Background(), client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) assert.Equal(t, uint32(2), res) }) @@ -201,7 +201,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f redisSrv := mustNewRedisServer() client := mkRedisClient(redisSrv.Addr()) - assert.Nil(t, nil, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "SET", "foo", "bar"))) + assert.Nil(t, nil, client.PipeDo(context.Background(), client.PipeAppend(redis.Pipeline{}, nil, "SET", "foo", "bar"))) redisSrv.Close() @@ -216,7 +216,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f assert.True(t, hasConnectionError, "expected connection error, got: %s", errMsg) } - expectErrContainEOF(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "GET", "foo"))) + expectErrContainEOF(t, client.PipeDo(context.Background(), client.PipeAppend(redis.Pipeline{}, nil, "GET", "foo"))) }) } } diff --git a/test/redis/fixed_cache_impl_test.go b/test/redis/fixed_cache_impl_test.go index e52abb68..1de91d01 100644 --- a/test/redis/fixed_cache_impl_test.go +++ b/test/redis/fixed_cache_impl_test.go @@ -69,7 +69,7 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key_value_1234", uint64(1)).SetArg(1, uint64(5)).DoAndReturn(pipeAppend) clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(1)).DoAndReturn(pipeAppend) - clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) + clientUsed.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key_value"), false, false, false, "", nil, false)} @@ -87,7 +87,7 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key2_value2_subkey2_subvalue2_1200", uint64(1)).SetArg(1, uint64(11)).DoAndReturn(pipeAppend) clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key2_value2_subkey2_subvalue2_1200", int64(60)).DoAndReturn(pipeAppend) - clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) + clientUsed.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequestWithPerDescriptorHitsAddend( "domain", @@ -118,7 +118,7 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key3_value3_subkey3_subvalue3_950400", uint64(1)).SetArg(1, uint64(13)).DoAndReturn(pipeAppend) clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key3_value3_subkey3_subvalue3_950400", int64(86400)).DoAndReturn(pipeAppend) - clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) + clientUsed.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequestWithPerDescriptorHitsAddend( "domain", @@ -206,7 +206,7 @@ func TestOverLimitWithLocalCache(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(11)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -233,7 +233,7 @@ func TestOverLimitWithLocalCache(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(13)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -254,7 +254,7 @@ func TestOverLimitWithLocalCache(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(16)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -306,7 +306,7 @@ func TestNearLimit(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(11)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -329,7 +329,7 @@ func TestNearLimit(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(13)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -347,7 +347,7 @@ func TestNearLimit(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(16)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -364,7 +364,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key5_value5_1234", uint64(3)).SetArg(1, uint64(5)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key5_value5_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key5", "value5"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key5_value5"), false, false, false, "", nil, false)} @@ -381,7 +381,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key6_value6_1234", uint64(2)).SetArg(1, uint64(7)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key6_value6_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key6", "value6"}}}, 2) limits = []*config.RateLimit{config.NewRateLimit(8, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key6_value6"), false, false, false, "", nil, false)} @@ -398,7 +398,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key7_value7_1234", uint64(3)).SetArg(1, uint64(19)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key7_value7_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key7", "value7"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key7_value7"), false, false, false, "", nil, false)} @@ -415,7 +415,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key8_value8_1234", uint64(3)).SetArg(1, uint64(22)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key8_value8_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key8", "value8"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key8_value8"), false, false, false, "", nil, false)} @@ -432,7 +432,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key9_value9_1234", uint64(7)).SetArg(1, uint64(22)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key9_value9_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key9", "value9"}}}, 7) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key9_value9"), false, false, false, "", nil, false)} @@ -449,7 +449,7 @@ func TestNearLimit(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key10_value10_1234", uint64(3)).SetArg(1, uint64(30)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key10_value10_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key10", "value10"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key10_value10"), false, false, false, "", nil, false)} @@ -479,7 +479,7 @@ func TestRedisWithJitter(t *testing.T) { jitterSource.EXPECT().Int63().Return(int64(100)) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key_value_1234", uint64(1)).SetArg(1, uint64(5)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(101)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key_value"), false, false, false, "", nil, false)} @@ -514,7 +514,7 @@ func TestOverLimitWithLocalCacheShadowRule(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(11)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -541,7 +541,7 @@ func TestOverLimitWithLocalCacheShadowRule(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(13)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -562,7 +562,7 @@ func TestOverLimitWithLocalCacheShadowRule(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint64(1)).SetArg(1, uint64(16)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) // The result should be OK since limit is in ShadowMode assert.Equal( @@ -624,7 +624,7 @@ func TestRedisTracer(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key_value_1234", uint64(1)).SetArg(1, uint64(5)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(1)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key_value"), false, false, false, "", nil, false)} @@ -664,7 +664,7 @@ func TestOverLimitWithStopCacheKeyIncrementWhenOverlimitConfig(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key5_value5_997200", uint64(1)).SetArg(1, uint64(11)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key5_value5_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil).Times(2) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil).Times(2) request := common.NewRateLimitRequestWithPerDescriptorHitsAddend("domain", [][][2]string{{{"key4", "value4"}}, {{"key5", "value5"}}}, []uint64{1, 1}) @@ -703,7 +703,7 @@ func TestOverLimitWithStopCacheKeyIncrementWhenOverlimitConfig(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key5_value5_997200", uint64(2)).SetArg(1, uint64(13)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key5_value5_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil).Times(2) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil).Times(2) request = common.NewRateLimitRequestWithPerDescriptorHitsAddend("domain", [][][2]string{{{"key4", "value4"}}, {{"key5", "value5"}}}, []uint64{2, 2}) @@ -737,7 +737,7 @@ func TestOverLimitWithStopCacheKeyIncrementWhenOverlimitConfig(t *testing.T) { client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key5_value5_997200", uint64(2)).SetArg(1, uint64(15)).DoAndReturn(pipeAppend) client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key5_value5_997200", int64(3600)).DoAndReturn(pipeAppend) - client.EXPECT().PipeDo(gomock.Any()).Return(nil).Times(2) + client.EXPECT().PipeDo(gomock.Any(), gomock.Any()).Return(nil).Times(2) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{