Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/introduction/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ The default ack policy is `NACK_ON_ERROR`: the timer is acknowledged on success,
| `polling_interval` | `0.05` s | Base poll interval used when the queue has work or just transitioned from idle |
| `max_polling_interval` | `5.0` s | Ceiling for the adaptive idle backoff — `polling_interval` doubles up to this value on consecutive empty polls. Worst-case delivery latency on a previously-idle queue is `max_polling_interval × 1.5` (with ±50% jitter) |
| `max_concurrent` | `5` | Max handlers running concurrently per subscriber; also bounds fetch batch size |
| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim |
| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim. Set to ~3–5× the P99 handler runtime: lower values speed up recovery from worker death, higher values tolerate handler GC pauses and clock skew |

## Operational requirements

`faststream-redis-timers` is designed for a **single-primary Redis** (Sentinel-managed primary/replica setups are supported; Redis Cluster is not — see the broker constructor docstring).

Multiple brokers polling the same Redis derive timer due-times and lease deadlines from each broker's local wall clock. Keep all broker hosts NTP-synchronised: clock skew larger than `lease_ttl` between brokers can cause a clock-fast broker to re-lease a timer that another broker is still actively processing, producing duplicate delivery to handlers. The default `lease_ttl=30s` tolerates seconds of NTP drift; tune `lease_ttl` upward if your environment cannot guarantee sub-second sync.
1 change: 1 addition & 0 deletions docs/usage/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Each `ScheduledTimer` has `topic`, `timer_id`, `activate_at`, `body`, `correlati
- `publisher.fetch_redis_timers()` always returns `[]` in tests — timers are delivered instantly, so there are never any pending entries.
- The inspection / bulk-cancel methods follow the same "nothing pending" contract: `broker.has_pending(...)` returns `False`, `broker.get_pending_timers(...)` returns `[]`, and `broker.cancel_all(...)` returns `0`. Use `TestTimersBroker.scheduled_timers` to assert *what was published*; the inspection API is for production introspection.
- The fake producer uses the same envelope format as the real one, so all serialization paths are exercised.
- `lease_ttl` and re-delivery are **not** simulated — handlers that exceed the configured TTL in production may be re-delivered to another worker, but tests will only invoke the handler once. Idempotency must be verified separately.

## pytest-asyncio configuration

Expand Down
15 changes: 13 additions & 2 deletions faststream_redis_timers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
from faststream._internal.context.repository import ContextRepo


def _require_topic(topic: str) -> None:
if not topic:
msg = "topic must be a non-empty string"
raise ValueError(msg)


class TimersParamsStorage(DefaultLoggerStorage):
__max_msg_id_ln = -1
_max_channel_name = 7
Expand Down Expand Up @@ -176,15 +182,16 @@ async def ping(self, timeout: float | None = None) -> bool:

async def publish( # noqa: PLR0913
self,
message: "SendableMessage" = None,
topic: str = "",
message: "SendableMessage",
topic: str,
*,
timer_id: str = "",
activate_in: timedelta = timedelta(0),
activate_at: datetime | None = None,
correlation_id: str | None = None,
headers: dict[str, typing.Any] | None = None,
) -> str:
_require_topic(topic)
if not timer_id:
timer_id = gen_cor_id()
cmd = TimerPublishCommand(
Expand All @@ -201,12 +208,14 @@ async def publish( # noqa: PLR0913

async def cancel_timer(self, topic: str, timer_id: str) -> None:
"""Cancel a pending timer. No-op if the timer has already fired or does not exist."""
_require_topic(topic)
full_topic = f"{self.config.broker_config.prefix}{topic}"
producer = typing.cast("TimersProducer", self.config.broker_config.producer)
await producer.cancel(full_topic, timer_id)

async def has_pending(self, topic: str, timer_id: str) -> bool:
"""Return True if a timer with this ID is still pending on *topic*."""
_require_topic(topic)
client = self.config.broker_config.connection.client
score = await client.zscore(self._topic_timeline_key(topic), timer_id)
return score is not None
Expand All @@ -219,6 +228,7 @@ async def get_pending_timers(self, topic: str, before: datetime | None = None) -
into the future, so they appear in the default (``before=None``) result but are excluded
once *before* is set to the current time.
"""
_require_topic(topic)
client = self.config.broker_config.connection.client
score_max: str | float = before.timestamp() if before is not None else "+inf"
raw_ids: list[bytes] | list[str] = await client.zrangebyscore(
Expand All @@ -233,6 +243,7 @@ async def cancel_all(self, topic: str) -> int:
Handlers already executing for a leased timer continue to run to completion;
their final commit is a no-op because the keys are gone.
"""
_require_topic(topic)
client = self.config.broker_config.connection.client
timeline_key = self._topic_timeline_key(topic)
payloads_key = self._topic_payloads_key(topic)
Expand Down
18 changes: 15 additions & 3 deletions faststream_redis_timers/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def _consume(self, client: "RedisClient", *, start_signal: anyio.Event) ->
while self.running:
try:
fetched = await self._get_msgs(client, tg, limiter)
except Exception as e: # noqa: BLE001 # pragma: no cover
except Exception as e: # noqa: BLE001
self._log(log_level=logging.ERROR, message=f"Message fetch error: {e!r}", exc_info=e)
error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP)
delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311
Expand Down Expand Up @@ -151,11 +151,23 @@ async def _claim_and_consume(
limiter: anyio.CapacityLimiter,
client: "RedisClient",
) -> None:
try:
timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id
except UnicodeDecodeError as e:
self._log(
log_level=logging.WARNING,
message=f"Dropping timer with non-UTF-8 id {raw_id!r}: {e!r}",
)
async with client.pipeline(transaction=True) as pipe:
pipe.zrem(self._config.topic_timeline_key, raw_id)
pipe.hdel(self._config.topic_payloads_key, raw_id)
await pipe.execute()
return

try:
async with limiter:
now = time.time()
claim_score = now + lease_ttl
timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id
raw_payload: bytes | None = await eval_cached(
client,
CLAIM_LUA,
Expand All @@ -181,7 +193,7 @@ async def _claim_and_consume(
)
self._log(log_level=logging.DEBUG, message=f"Timer {timer_id!r} delivered to handler")
await self.consume(msg)
except Exception as e: # noqa: BLE001 # pragma: no cover
except Exception as e: # noqa: BLE001
self._log(
log_level=logging.ERROR,
message=f"Timer {raw_id!r} consume error: {e!r}",
Expand Down
117 changes: 116 additions & 1 deletion tests/test_unit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import logging
import warnings
from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import faststream.asgi.factories.asyncapi.try_it_out
import pytest
from faststream.exceptions import IncorrectState
Expand Down Expand Up @@ -300,6 +302,119 @@ async def handler(body: str) -> None: ...
await asyncio.sleep(0.05)


# --- topic must be non-empty (B8) ---


async def test_publish_rejects_empty_topic() -> None:
broker = TimersBroker(AsyncMock())
with pytest.raises(ValueError, match="topic must be a non-empty string"):
await broker.publish("msg", topic="")


async def test_cancel_timer_rejects_empty_topic() -> None:
broker = TimersBroker(AsyncMock())
with pytest.raises(ValueError, match="topic must be a non-empty string"):
await broker.cancel_timer("", "id")


async def test_has_pending_rejects_empty_topic() -> None:
broker = TimersBroker(AsyncMock())
with pytest.raises(ValueError, match="topic must be a non-empty string"):
await broker.has_pending("", "id")


async def test_get_pending_timers_rejects_empty_topic() -> None:
broker = TimersBroker(AsyncMock())
with pytest.raises(ValueError, match="topic must be a non-empty string"):
await broker.get_pending_timers("")


async def test_cancel_all_rejects_empty_topic() -> None:
broker = TimersBroker(AsyncMock())
with pytest.raises(ValueError, match="topic must be a non-empty string"):
await broker.cancel_all("")


# --- error path log format (B6) ---


async def test_consume_logs_get_msgs_error_with_repr() -> None:
"""A Redis error during _get_msgs is logged at ERROR with `{e!r}` so the type and msg survive Sentry truncation."""
client = AsyncMock()
client.ping.return_value = True
client.zrangebyscore.side_effect = ConnectionError("redis down")
broker = TimersBroker(client, start_timeout=2.0)

@broker.subscriber("topic", polling_interval=0.05)
async def handler(body: str) -> None: ...

sub = next(iter(broker._subscribers)) # noqa: SLF001
log_calls: list[dict[str, object]] = []
sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment]

async with broker:
await asyncio.sleep(0.1)

error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR]
assert error_logs, "expected at least one ERROR log"
msg = error_logs[0]["message"]
assert isinstance(msg, str)
assert "Message fetch error" in msg
assert "ConnectionError('redis down')" in msg


async def test_claim_and_consume_logs_unhandled_error_with_repr() -> None:
"""An unhandled error inside the limiter block is logged with `{raw_id!r}` and `{e!r}`."""
client = AsyncMock()
broker = TimersBroker(client)
sub = broker.subscriber("topic")
await broker.connect()

log_calls: list[dict[str, object]] = []
sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment]

limiter = anyio.CapacityLimiter(1)
raw_id = b"timer-1"

with patch(
"faststream_redis_timers.subscriber.usecase.eval_cached",
new=AsyncMock(side_effect=RuntimeError("boom")),
):
await sub._claim_and_consume(raw_id, 30, limiter, client) # noqa: SLF001

error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR]
assert error_logs
msg = error_logs[0]["message"]
assert isinstance(msg, str)
assert "b'timer-1'" in msg
assert "RuntimeError('boom')" in msg


# --- non-UTF-8 timer id recovery (B2) ---


async def test_claim_and_consume_drops_non_utf8_id() -> None:
"""A non-UTF-8 raw_id is removed from both keys so polls recover instead of looping forever."""
client = AsyncMock()
pipe = MagicMock()
pipe.__aenter__ = AsyncMock(return_value=pipe)
pipe.__aexit__ = AsyncMock(return_value=None)
pipe.execute = AsyncMock(return_value=[1, 1])
client.pipeline = MagicMock(return_value=pipe)

broker = TimersBroker(client)
sub = broker.subscriber("topic")
await broker.connect()
bad_id = b"\xff\xfe-broken"
limiter = anyio.CapacityLimiter(1)

await sub._claim_and_consume(bad_id, 30, limiter, client) # noqa: SLF001

pipe.zrem.assert_called_once_with(sub._config.topic_timeline_key, bad_id) # noqa: SLF001
pipe.hdel.assert_called_once_with(sub._config.topic_payloads_key, bad_id) # noqa: SLF001
pipe.execute.assert_awaited_once()


# --- eval_cached NOSCRIPT fallback (O1) ---


Expand Down
Loading