From cf2c4a54e6ea942095f8e0fa9a5a1c3aed5dcef6 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 16:17:42 +0300 Subject: [PATCH] Reject RedisCluster clients with a clear error Raises TypeError at TimersBroker construction when handed a RedisCluster client. The default key layout uses two keys per topic without a hash tag, so multi-key Lua scripts and pipelines would fail with CROSSSLOT in Cluster mode. Surfacing the limitation up front is far easier to diagnose than a later runtime CROSSSLOT from inside a Lua eval. Single-primary Redis (including Sentinel) is unchanged. --- README.md | 5 +++++ faststream_redis_timers/broker.py | 8 ++++++++ tests/test_unit.py | 7 +++++++ 3 files changed, 20 insertions(+) diff --git a/README.md b/README.md index 6704b17..288beeb 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,11 @@ Run multiple `TimersBroker` processes against the same Redis keys. The Lua claim script ensures each due timer is leased by exactly one worker at a time; failover is automatic via lease expiry. +Supports single-primary Redis, including Sentinel-managed primary/replica +setups. Redis Cluster is **not** supported: each topic's timeline and +payload keys must live on the same node, and the broker raises a +`TypeError` if constructed with a `RedisCluster` client. + ```python broker = TimersBroker( Redis.from_url("redis://..."), diff --git a/faststream_redis_timers/broker.py b/faststream_redis_timers/broker.py index c081e2e..e1346c0 100644 --- a/faststream_redis_timers/broker.py +++ b/faststream_redis_timers/broker.py @@ -18,6 +18,7 @@ from faststream.response.publish_type import PublishType from faststream.specification.schema import BrokerSpec from faststream.specification.schema.extra import Tag, TagDict +from redis.asyncio.cluster import RedisCluster from faststream_redis_timers.configs import ConnectionState, RedisClient, TimersBrokerConfig from faststream_redis_timers.message import TimerMessage @@ -99,6 +100,13 @@ def __init__( # noqa: PLR0913 description: str | None = None, tags: Iterable[Tag | TagDict] = (), ) -> None: + if isinstance(client, RedisCluster): + msg = ( + "TimersBroker does not support RedisCluster clients: the timeline and " + "payload keys for a topic must live on the same node. Use a single-primary " + "Redis (Sentinel-managed primary/replica setups are supported)." + ) + raise TypeError(msg) fd_config = FastDependsConfig(use_fastdepends=apply_types) connection = ConnectionState(client) broker_config = TimersBrokerConfig( diff --git a/tests/test_unit.py b/tests/test_unit.py index 6c7826a..1901551 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -6,6 +6,7 @@ import faststream.asgi.factories.asyncapi.try_it_out import pytest from faststream.exceptions import IncorrectState +from redis.asyncio.cluster import RedisCluster from redis.exceptions import NoScriptError from faststream_redis_timers import TestTimersBroker, TimersBroker @@ -56,6 +57,12 @@ async def test_broker_config_connect_and_disconnect() -> None: await config.disconnect() +def test_broker_rejects_redis_cluster_client() -> None: + fake_cluster = MagicMock(spec=RedisCluster) + with pytest.raises(TypeError, match="RedisCluster"): + TimersBroker(fake_cluster) + + # --- TimersBroker.ping ---