Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ For high throughput scenarios, ratelimit supports write buffering via [radix v4'

1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: controls how often buffered writes are flushed to the network connection. When set to a non-zero value (e.g., 150us-500us), radix v4 will buffer multiple concurrent write operations and flush them together, reducing system calls and improving throughput. If zero, each write is flushed immediately. **Required for Redis Cluster mode.**
1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: **DEPRECATED** - These settings have no effect in radix v4. Write buffering is controlled solely by the window settings above.
1. `REDIS_CLUSTER_PIPELINE_PARALLELISM` & `REDIS_PERSECOND_CLUSTER_PIPELINE_PARALLELISM`: controls per-key pipeline group concurrency in Redis Cluster mode. Default: `1`, which preserves the legacy serial behavior. Set to `0` for auto parallelism bounded by the corresponding Redis pool size, or a value greater than `1` to bound concurrent Redis group calls per pipeline execution. Configured values greater than the corresponding Redis pool size are capped to the pool size.

Write buffering is disabled by default (window = 0). For optimal performance, set `REDIS_PIPELINE_WINDOW` to 150us-500us depending on your latency requirements and load patterns.

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
golang.org/x/net v0.52.0
golang.org/x/sync v0.20.0
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
12 changes: 8 additions & 4 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ func NewRateLimiterCacheImplFromSettings(ctx context.Context, s settings.Setting
closer := &utils.MultiCloser{}
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(ctx, srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
perSecondPool = newClientImpl(ctx, srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout,
s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth, s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime)
s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondSentinelAuth,
s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime,
s.RedisPerSecondClusterPipelineParallelism)
closer.Closers = append(closer.Closers, perSecondPool)
}

otherPool := NewClientImpl(ctx, srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
otherPool := newClientImpl(ctx, srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout,
s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth, s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime)
s.RedisPoolOnEmptyBehavior, s.RedisSentinelAuth,
s.RedisStartupInitialInterval, s.RedisStartupMaxInterval, s.RedisStartupMaxElapsedTime,
s.RedisClusterPipelineParallelism)
closer.Closers = append(closer.Closers, otherPool)

return NewFixedRateLimitCacheImpl(
Expand Down
9 changes: 7 additions & 2 deletions src/redis/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package redis

import "github.com/mediocregopher/radix/v4"
import (
"context"

"github.com/mediocregopher/radix/v4"
)

// Errors that may be raised during config parsing.
type RedisError string
Expand Down Expand Up @@ -32,8 +36,9 @@ type Client interface {
// a single write, then reads their responses in a single read. This reduces
// network delay into a single round-trip.
//
// @param ctx supplies the context for Redis I/O.
// @param pipeline supplies the queue for pending commands.
PipeDo(pipeline Pipeline) error
PipeDo(ctx context.Context, pipeline Pipeline) error

// Once Close() is called all future method calls on the Client will return
// an error
Expand Down
137 changes: 108 additions & 29 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/mediocregopher/radix/v4"
"github.com/mediocregopher/radix/v4/trace"
logger "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/envoyproxy/ratelimit/src/server"
"github.com/envoyproxy/ratelimit/src/utils"
Expand Down Expand Up @@ -69,9 +70,10 @@ type redisClient interface {
}

type clientImpl struct {
client redisClient
stats poolStats
isCluster bool
client redisClient
stats poolStats
isCluster bool
clusterPipelineParallelism int
}

func checkError(err error) {
Expand All @@ -80,6 +82,27 @@ func checkError(err error) {
}
}

func effectiveClusterPipelineParallelism(configuredParallelism, poolSize int) int {
if configuredParallelism < 0 {
panic(RedisError("redis cluster pipeline parallelism must be >= 0"))
}

if configuredParallelism == 1 {
return 1
}

poolCeiling := poolSize
if poolCeiling < 1 {
poolCeiling = 1
}

if configuredParallelism == 0 || configuredParallelism > poolCeiling {
return poolCeiling
}

return configuredParallelism
}

// createDialer creates a radix.Dialer with timeout, TLS, and auth configuration
// targetName is used for logging to identify the connection target (e.g., URL, "sentinel(url)")
func createDialer(timeout time.Duration, useTls bool, tlsConfig *tls.Config, auth string, targetName string) radix.Dialer {
Expand Down Expand Up @@ -124,6 +147,18 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string,
startupInitialInterval, startupMaxInterval, startupMaxElapsedTime time.Duration,
) Client {
return newClientImpl(ctx, scope, useTls, auth, redisSocketType, redisType, url, poolSize,
pipelineWindow, pipelineLimit, tlsConfig, healthCheckActiveConnection, srv,
timeout, poolOnEmptyBehavior, sentinelAuth,
startupInitialInterval, startupMaxInterval, startupMaxElapsedTime, 1)
}

func newClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
timeout time.Duration, poolOnEmptyBehavior string, sentinelAuth string,
startupInitialInterval, startupMaxInterval, startupMaxElapsedTime time.Duration,
clusterPipelineParallelism int,
) Client {
maskedUrl := utils.MaskCredentialsInUrl(url)
logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize)
Expand Down Expand Up @@ -156,6 +191,21 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re
logger.Debugf("Cluster mode: setting WriteFlushInterval to %v", pipelineWindow)
}

effectivePipelineParallelism := clusterPipelineParallelism
if isCluster {
effectivePipelineParallelism = effectiveClusterPipelineParallelism(clusterPipelineParallelism, poolSize)
switch {
case clusterPipelineParallelism == 0:
logger.Warnf("Redis cluster pipeline parallelism: auto bounded to Redis pool size (%d concurrent groups)", effectivePipelineParallelism)
case clusterPipelineParallelism != effectivePipelineParallelism:
logger.Warnf("Redis cluster pipeline parallelism: configured value %d exceeds Redis pool size %d; bounded to %d concurrent groups", clusterPipelineParallelism, poolSize, effectivePipelineParallelism)
case effectivePipelineParallelism == 1:
logger.Warnf("Redis cluster pipeline parallelism: disabled (serial legacy behavior)")
default:
logger.Warnf("Redis cluster pipeline parallelism: bounded to %d concurrent groups", effectivePipelineParallelism)
}
}

// IMPORTANT: radix v4 pool behavior changes from v3
//
// v4 uses a FIXED pool size and BLOCKS when all connections are in use.
Expand Down Expand Up @@ -278,9 +328,10 @@ func NewClientImpl(ctx context.Context, scope stats.Scope, useTls bool, auth, re
}

return &clientImpl{
client: client,
stats: stats,
isCluster: isCluster,
client: client,
stats: stats,
isCluster: isCluster,
clusterPipelineParallelism: effectivePipelineParallelism,
}
}

Expand Down Expand Up @@ -312,8 +363,7 @@ func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key str
})
}

func (c *clientImpl) PipeDo(pipeline Pipeline) error {
ctx := context.Background()
func (c *clientImpl) PipeDo(ctx context.Context, pipeline Pipeline) error {
if c.isCluster {
// Cluster mode: group commands by key and execute each group as a pipeline.
// This ensures INCRBY + EXPIRE for the same key are pipelined together (same slot),
Expand All @@ -329,13 +379,25 @@ func (c *clientImpl) PipeDo(pipeline Pipeline) error {
return c.client.Do(ctx, p)
}

// executeGroupedPipeline groups pipeline actions by key and executes each group
// as a separate pipeline. This allows same-key commands (like INCRBY + EXPIRE)
// to be pipelined together even in cluster mode.
// executeGroupedPipeline routes a pipeline of Redis actions in cluster mode
// via a three-tier dispatch:
//
// 1. Single-action fast path (len==1): skip grouping entirely.
// 2. Serial compatibility path: clusterPipelineParallelism == 1 preserves
// the pre-parallelization behavior.
// 3. General path: group actions by key (same-key commands like INCRBY +
// EXPIRE are still pipelined together) and execute groups concurrently
// via errgroup with clusterPipelineParallelism as the max concurrent group
// count.
func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeline) error {
// Group actions by key, preserving first-occurrence order
var groups [][]radix.Action
keyToIndex := make(map[string]int)
// Tier 1: single action — skip grouping, skip map alloc.
if len(pipeline) == 1 {
return c.client.Do(ctx, pipeline[0].Action)
}

// Tier 2: group by key, preserving first-occurrence order.
groups := make([][]radix.Action, 0, len(pipeline))
keyToIndex := make(map[string]int, len(pipeline))

for _, pa := range pipeline {
if idx, exists := keyToIndex[pa.Key]; exists {
Expand All @@ -346,23 +408,40 @@ func (c *clientImpl) executeGroupedPipeline(ctx context.Context, pipeline Pipeli
}
}

// Execute each group
if c.clusterPipelineParallelism == 1 {
return c.doPipelineGroupsSerial(ctx, groups)
}

// Execute groups concurrently.
eg, egCtx := errgroup.WithContext(ctx)
if c.clusterPipelineParallelism > 1 {
eg.SetLimit(c.clusterPipelineParallelism)
}
for _, actions := range groups {
if len(actions) == 1 {
if err := c.client.Do(ctx, actions[0]); err != nil {
return err
}
} else {
// Multiple commands for same key: pipeline them together
p := radix.NewPipeline()
for _, action := range actions {
p.Append(action)
}
if err := c.client.Do(ctx, p); err != nil {
return err
}
}
actions := actions
eg.Go(func() error {
return c.doPipelineGroup(egCtx, actions)
})
}
return eg.Wait()
}

func (c *clientImpl) doPipelineGroupsSerial(ctx context.Context, groups [][]radix.Action) error {
for _, actions := range groups {
if err := c.doPipelineGroup(ctx, actions); err != nil {
return err
}
}
return nil
}

func (c *clientImpl) doPipelineGroup(ctx context.Context, actions []radix.Action) error {
if len(actions) == 1 {
return c.client.Do(ctx, actions[0])
}
p := radix.NewPipeline()
for _, action := range actions {
p.Append(action)
}
return c.client.Do(ctx, p)
}
Loading
Loading