Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ Run multiple `TimersBroker` processes against the same Redis keys. The Lua
claim script ensures each due timer is leased by exactly one worker at a
time; failover is automatic via lease expiry.

Supports single-primary Redis, including Sentinel-managed primary/replica
setups. Redis Cluster is **not** supported: each topic's timeline and
payload keys must live on the same node, and the broker raises a
`TypeError` if constructed with a `RedisCluster` client.

```python
broker = TimersBroker(
Redis.from_url("redis://..."),
Expand Down
8 changes: 8 additions & 0 deletions faststream_redis_timers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from faststream.response.publish_type import PublishType
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
from redis.asyncio.cluster import RedisCluster

from faststream_redis_timers.configs import ConnectionState, RedisClient, TimersBrokerConfig
from faststream_redis_timers.message import TimerMessage
Expand Down Expand Up @@ -99,6 +100,13 @@ def __init__( # noqa: PLR0913
description: str | None = None,
tags: Iterable[Tag | TagDict] = (),
) -> None:
if isinstance(client, RedisCluster):
msg = (
"TimersBroker does not support RedisCluster clients: the timeline and "
"payload keys for a topic must live on the same node. Use a single-primary "
"Redis (Sentinel-managed primary/replica setups are supported)."
)
raise TypeError(msg)
fd_config = FastDependsConfig(use_fastdepends=apply_types)
connection = ConnectionState(client)
broker_config = TimersBrokerConfig(
Expand Down
7 changes: 7 additions & 0 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import faststream.asgi.factories.asyncapi.try_it_out
import pytest
from faststream.exceptions import IncorrectState
from redis.asyncio.cluster import RedisCluster
from redis.exceptions import NoScriptError

from faststream_redis_timers import TestTimersBroker, TimersBroker
Expand Down Expand Up @@ -56,6 +57,12 @@ async def test_broker_config_connect_and_disconnect() -> None:
await config.disconnect()


def test_broker_rejects_redis_cluster_client() -> None:
fake_cluster = MagicMock(spec=RedisCluster)
with pytest.raises(TypeError, match="RedisCluster"):
TimersBroker(fake_cluster)


# --- TimersBroker.ping ---


Expand Down
Loading