Skip to content
14 changes: 14 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,20 @@ OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
# Return strings instead of bytes (recommended: true)
# REDIS_DECODE_RESPONSES=true

# =============================================================================
# Redis Cluster Mode
# =============================================================================

# Enable Redis Cluster support for deployments using Redis Cluster (e.g.
# Bitnami redis-cluster Helm chart). When enabled, the client uses
# redis.asyncio.RedisCluster which handles MOVED/ASK redirects automatically.
# /0 in REDIS_URL is silently stripped in cluster mode; a non-zero database
# (e.g. /1) raises a startup error because Redis Cluster only supports db 0.
# NOTE: async RedisCluster does not support publish()/pubsub(), so cross-worker
# cache invalidation and session broadcasting are not available in cluster mode.
# Default: false
# REDIS_CLUSTER_MODE=false

# =============================================================================
# Redis Parser Configuration (Performance - ADR-026)
# =============================================================================
Expand Down
1 change: 1 addition & 0 deletions charts/mcp-stack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ mcpContextForge:
REDIS_RETRY_ON_TIMEOUT: "true" # Retry commands on timeout
REDIS_HEALTH_CHECK_INTERVAL: "30" # Health check interval (seconds, 0=disabled)
REDIS_DECODE_RESPONSES: "true" # Return strings instead of bytes
REDIS_CLUSTER_MODE: "false" # Use RedisCluster client for cluster deployments (handles MOVED/ASK redirects)

# ─ Redis leader election (multi-node) ─
REDIS_LEADER_TTL: "15" # Leader TTL (seconds)
Expand Down
12 changes: 9 additions & 3 deletions mcpgateway/cache/session_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,12 @@ async def initialize(self) -> None:
# Get shared Redis client from factory
self._redis = await get_redis_client()
if self._redis:
self._pubsub = self._redis.pubsub()
await self._pubsub.subscribe("mcp_session_events")
logger.info("Session registry connected to shared Redis client")
if hasattr(self._redis, "pubsub"):
self._pubsub = self._redis.pubsub()
await self._pubsub.subscribe("mcp_session_events")
logger.info("Session registry connected to shared Redis client")
else:
logger.warning("Session registry: Redis client does not support pubsub (cluster mode); cross-worker session events disabled")

elif self._backend == "none":
# Nothing to initialize for none backend
Expand Down Expand Up @@ -1526,6 +1529,9 @@ async def respond(
if not self._redis:
logger.warning(f"Redis client not initialized, cannot respond to {session_id}")
return
if not hasattr(self._redis, "pubsub"):
logger.warning(f"Redis client does not support pubsub (cluster mode), cannot respond to {session_id}")
return
pubsub = self._redis.pubsub()
await pubsub.subscribe(session_id)

Expand Down
8 changes: 8 additions & 0 deletions mcpgateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,14 @@ def parse_issuers(cls, v: Any) -> list[str]:
description="Redis protocol parser: auto (use hiredis if available), hiredis (require hiredis), python (pure-Python)",
)

# Redis Cluster Mode
redis_cluster_mode: bool = Field(
default=False,
description="Use RedisCluster client for Redis Cluster deployments. "
"When enabled, the client handles MOVED/ASK redirects automatically. "
"The REDIS_URL must not include a database number (e.g. /0) in cluster mode.",
)

# Redis Connection Pool - Performance Optimized
redis_decode_responses: bool = Field(default=True, description="Return strings instead of bytes")
redis_max_connections: int = Field(default=50, description="Connection pool size per worker")
Expand Down
186 changes: 162 additions & 24 deletions mcpgateway/utils/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
This module provides a single source of truth for Redis client creation,
ensuring all services use the same connection pool and settings.

Supports both standalone Redis and Redis Cluster deployments. When
``REDIS_CLUSTER_MODE=true``, the factory creates a
``redis.asyncio.RedisCluster`` client that handles MOVED/ASK redirects
automatically. Otherwise it creates a standard ``redis.asyncio.Redis``
client via ``from_url``.

Performance: Uses hiredis C parser by default (ADR-026) for up to 83x faster
response parsing on large responses. Falls back to pure-Python parser if
hiredis is unavailable or explicitly disabled via REDIS_PARSER setting.
Expand All @@ -25,6 +31,7 @@
# Standard
import logging
from typing import Any, Optional
from urllib.parse import urlparse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,9 +91,140 @@ def _get_async_parser_class(parser_setting: str) -> tuple[Any, str]:
return None, "AsyncRESP2Parser (pure-Python, auto-detected)"


def _strip_db_from_url(url: str) -> str:
"""Remove the database number from a Redis URL for cluster mode.

Redis Cluster only supports database 0. If the URL contains ``/0`` it is
silently stripped. If it contains a non-zero database (``/1``, ``/2``, …)
a ``ValueError`` is raised so that misconfigurations fail fast instead of
being silently ignored.

Args:
url: Redis connection URL, e.g. ``redis://:pass@host:6379/0``

Returns:
str: URL without the trailing database path.

Raises:
ValueError: If the URL specifies a non-zero database number.

Examples:
>>> _strip_db_from_url("redis://:pass@host:6379/0")
'redis://:pass@host:6379'
>>> _strip_db_from_url("redis://:pass@host:6379")
'redis://:pass@host:6379'
>>> _strip_db_from_url("redis://:pass@host:6379/1")
Traceback (most recent call last):
...
ValueError: Redis Cluster only supports database 0, but REDIS_URL specifies /1. Remove the database selector or use /0.
"""
parsed = urlparse(url)
if parsed.path and parsed.path not in ("", "/"):
db_str = parsed.path.lstrip("/")
if db_str and db_str != "0":
raise ValueError(f"Redis Cluster only supports database 0, but REDIS_URL " f"specifies /{db_str}. Remove the database selector or use /0.")
cleaned = parsed._replace(path="")
return cleaned.geturl()
return url


def _mask_redis_url(url: str) -> str:
"""Mask credentials in a Redis URL for safe logging.

Replaces the password portion of the URL with ``***`` so that
connection details can be logged without leaking secrets.

Args:
url: Redis connection URL, e.g. ``redis://:secret@host:6379``

Returns:
str: URL with password replaced by ``***``.

Examples:
>>> _mask_redis_url("redis://:secret@host:6379")
'redis://:***@host:6379'
>>> _mask_redis_url("redis://host:6379")
'redis://host:6379'
"""
parsed = urlparse(url)
if parsed.password:
# Replace password in netloc
masked_netloc = parsed.netloc.replace(f":{parsed.password}@", ":***@", 1)
return parsed._replace(netloc=masked_netloc).geturl()
return url


async def _create_cluster_client(settings: Any, aioredis: Any) -> Any:
"""Create a ``redis.asyncio.RedisCluster`` client.

Args:
settings: Application settings object.
aioredis: The ``redis.asyncio`` module.

Returns:
An initialised ``RedisCluster`` async client.

Note:
``RedisCluster`` does not accept ``retry_on_timeout``,
``parser_class``, or ``single_connection_client``. It does
accept ``max_connections`` and ``health_check_interval``.
``publish()`` and ``pubsub()`` are **not** available on the
async cluster client (redis-py 7.x).
"""
url = _strip_db_from_url(settings.redis_url)

cluster_kwargs: dict[str, Any] = {
"decode_responses": settings.redis_decode_responses,
"max_connections": settings.redis_max_connections,
"socket_timeout": settings.redis_socket_timeout,
"socket_connect_timeout": settings.redis_socket_connect_timeout,
"health_check_interval": settings.redis_health_check_interval,
"encoding": "utf-8",
}

client = aioredis.RedisCluster.from_url(url, **cluster_kwargs)
await client.ping()
return client


async def _create_standalone_client(settings: Any, aioredis: Any, parser_class: Any) -> Any:
"""Create a standard ``redis.asyncio.Redis`` client.

Args:
settings: Application settings object.
aioredis: The ``redis.asyncio`` module.
parser_class: Optional parser class override (or *None* for auto).

Returns:
An initialised ``Redis`` async client.
"""
connection_kwargs: dict[str, Any] = {
"decode_responses": settings.redis_decode_responses,
"max_connections": settings.redis_max_connections,
"socket_timeout": settings.redis_socket_timeout,
"socket_connect_timeout": settings.redis_socket_connect_timeout,
"retry_on_timeout": settings.redis_retry_on_timeout,
"health_check_interval": settings.redis_health_check_interval,
"encoding": "utf-8",
"single_connection_client": False,
}

if parser_class is not None:
connection_kwargs["parser_class"] = parser_class

client = aioredis.from_url(settings.redis_url, **connection_kwargs)
await client.ping()
return client


async def get_redis_client() -> Optional[Any]:
"""Get or create the shared async Redis client.

When ``REDIS_CLUSTER_MODE`` is enabled the factory returns a
``redis.asyncio.RedisCluster`` instance that transparently handles
MOVED/ASK redirects across cluster shards. Otherwise a standard
``redis.asyncio.Redis`` client is returned.

Uses hiredis C parser by default for up to 83x faster response parsing.
Parser selection controlled by REDIS_PARSER setting (auto/hiredis/python).

Expand Down Expand Up @@ -130,34 +268,34 @@ async def get_redis_client() -> Optional[Any]:
_initialized = True
return None

# Validate cluster URL before attempting connection so
# misconfigurations (e.g. /1) crash immediately instead of
# being silently swallowed by the generic except below.
if settings.redis_cluster_mode:
_strip_db_from_url(settings.redis_url)

try:
# Get parser configuration (ADR-026)
parser_class, _parser_info = _get_async_parser_class(settings.redis_parser)

# Build connection kwargs
connection_kwargs: dict[str, Any] = {
"decode_responses": settings.redis_decode_responses,
"max_connections": settings.redis_max_connections,
"socket_timeout": settings.redis_socket_timeout,
"socket_connect_timeout": settings.redis_socket_connect_timeout,
"retry_on_timeout": settings.redis_retry_on_timeout,
"health_check_interval": settings.redis_health_check_interval,
"encoding": "utf-8",
"single_connection_client": False,
}

# Only specify parser_class if explicitly set (not auto)
if parser_class is not None:
connection_kwargs["parser_class"] = parser_class

_client = aioredis.from_url(settings.redis_url, **connection_kwargs)
await _client.ping()
logger.info(
f"Redis client initialized: parser={_parser_info}, "
f"pool_size={settings.redis_max_connections}, "
f"timeout={settings.redis_socket_timeout}s, "
f"health_check={settings.redis_health_check_interval}s"
)
if settings.redis_cluster_mode:
_client = await _create_cluster_client(settings, aioredis)
masked_url = _mask_redis_url(_strip_db_from_url(settings.redis_url))
logger.info(f"Redis Cluster client initialized: parser={_parser_info}, " f"timeout={settings.redis_socket_timeout}s, " f"url={masked_url}")
logger.warning(
"Redis Cluster mode: async RedisCluster does not support "
"publish()/pubsub(). Cross-worker cache invalidation, "
"session broadcasting, and cancellation propagation will "
"not function until those callers are migrated."
)
else:
_client = await _create_standalone_client(settings, aioredis, parser_class)
logger.info(
f"Redis client initialized: parser={_parser_info}, "
f"pool_size={settings.redis_max_connections}, "
f"timeout={settings.redis_socket_timeout}s, "
f"health_check={settings.redis_health_check_interval}s"
)
except ImportError as e:
logger.error(f"Redis parser configuration error: {e}")
_client = None
Expand Down
Loading