Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ Per-subscriber knobs (passed to `@broker.subscriber("topic", ...)`):
ack failed to land in Redis (network blip) will be retried; a handler that
takes longer than `lease_ttl` may be re-delivered to another worker.
- **Buggy handler retries forever.** If a handler always raises, the timer is
retried indefinitely. Raise `faststream.exceptions.RejectMessage` from your
handler to drop a poison-pill timer permanently. A built-in `max_attempts`
counter is planned for a future release.
retried every `lease_ttl` seconds indefinitely — there is no built-in
attempt counter. Raise `faststream.exceptions.RejectMessage` from your
handler to drop a poison-pill timer permanently, or track attempts in your
own state if you need a hard cap.

## High availability

Expand Down
2 changes: 1 addition & 1 deletion docs/usage/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def schedule_reminder() -> None:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `client` | `None` | `redis.asyncio.Redis` client instance — the caller owns its lifecycle (the broker does not close it) |
| `client` | _required for production_ | `redis.asyncio.Redis` client instance — the caller owns its lifecycle (the broker does not close it). The constructor accepts `None`, but a broker without a client raises `IncorrectState` on the first operation; `None` is the test-broker shape (see [Testing](./testing.md)). |
| `timeline_key` | `timers_timeline` | Sorted set key name |
| `payloads_key` | `timers_payloads` | Hash key name |
| `start_timeout` | `3.0` | Seconds to wait for the subscriber's first Redis ping during startup |
Expand Down
7 changes: 7 additions & 0 deletions docs/usage/publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ The `publish()` method on a publisher accepts the parameters below and returns t
`correlation_id` explicitly if your tracing pipeline needs per-publish
uniqueness.

!!! note "Past activation times fire immediately"
`activate_in=timedelta(seconds=-5)` and `activate_at` set in the past both
schedule the timer at a negative-relative score — the next subscriber poll
picks it up and fires it. No error is raised; this lets `activate_at`
computations that take "marginally too long" still deliver. If you want
strict scheduling, validate at the call site before publishing.

### Tracing & headers example

```python
Expand Down
8 changes: 6 additions & 2 deletions docs/usage/subscriber.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ The `timer_id` is available as the message's `message_id`. Inject it via `Contex
```python
from faststream import Context
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis

broker = TimersBroker()
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)


@broker.subscriber("invoices")
Expand Down Expand Up @@ -79,11 +81,13 @@ Configure polling behaviour per subscriber:
async def handle_urgent(body: str) -> None: ...
```

The poll loop uses adaptive backoff: when there are no due timers, the next sleep doubles from `polling_interval` up to `max_polling_interval` and is multiplied by a random factor in `[0.5, 1.5]` to avoid thundering-herd bursts across worker fleets. The counter resets the moment a poll returns work. Worst-case delivery latency for a newly-published timer in a previously-idle queue is `max_polling_interval × 1.5`.
The poll loop uses adaptive backoff: when there are no due timers, the next sleep doubles from `polling_interval` up to `max_polling_interval` and is multiplied by a random factor in `[0.5, 1.5]` to avoid thundering-herd bursts across worker fleets. The counter resets the moment a poll returns work. Worst-case delivery latency for a newly-published timer in a previously-idle queue is `max_polling_interval × 1.5`, plus any time spent waiting for the `max_concurrent` limiter when in-flight handlers are still holding capacity (back-pressure).

!!! warning "Handlers must be idempotent and concurrency-safe"
A handler that runs longer than `lease_ttl`, or a worker that crashes after the handler ran but before the commit landed, may cause the timer to be delivered more than once. Design handlers to be safe under retry. Because `max_concurrent` invocations run in parallel, handlers must also be safe under concurrent execution (no unsynchronized shared state).

A handler that raises *every* time is also retried indefinitely — the lease score expires after `lease_ttl` seconds and the next poll re-claims the timer, so `lease_ttl` is the lower bound on the retry interval (e.g., the default `lease_ttl=30` retries a poison-pill handler about twice a minute). Raise `faststream.exceptions.RejectMessage` from your handler to permanently drop a timer that should never be retried.

## Ack policy

The default ack policy is `NACK_ON_ERROR`: the timer is acknowledged (removed from Redis) on success, and left for retry on any unhandled exception.
Expand Down
1 change: 1 addition & 0 deletions docs/usage/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Each `ScheduledTimer` has `topic`, `timer_id`, `activate_at`, `body`, `correlati
- `timer_id` is passed through normally and available in the handler via the message.
- `cancel_timer` / `publisher.cancel()` are no-ops in tests — since messages are delivered immediately, by the time you'd cancel, the handler has already run.
- `publisher.fetch_redis_timers()` always returns `[]` in tests — timers are delivered instantly, so there are never any pending entries.
- The inspection / bulk-cancel methods follow the same "nothing pending" contract: `broker.has_pending(...)` returns `False`, `broker.get_pending_timers(...)` returns `[]`, and `broker.cancel_all(...)` returns `0`. Use `TestTimersBroker.scheduled_timers` to assert *what was published*; the inspection API is for production introspection.
- The fake producer uses the same envelope format as the real one, so all serialization paths are exercised.

## pytest-asyncio configuration
Expand Down
15 changes: 13 additions & 2 deletions faststream_redis_timers/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock

from faststream._internal.testing.broker import TestBroker, change_producer

Expand Down Expand Up @@ -63,8 +63,19 @@ def _patch_producer(self, broker: TimersBroker) -> "Iterator[None]":

@contextmanager
def _patch_broker(self, broker: TimersBroker) -> "Iterator[None]":
# Test-broker contract: messages deliver immediately, so there are
# never any pending timers. Stub the inspection paths to return that.
mock_client = AsyncMock()
mock_client.zrangebyscore.return_value = []
mock_client.zrangebyscore.return_value = [] # get_pending_timers -> []
mock_client.zscore.return_value = None # has_pending -> False
# cancel_all uses `async with client.pipeline(...) as pipe`. AsyncMock's
# default makes pipeline() return a coroutine, which can't be entered.
# Use MagicMock so the call returns the AsyncMock directly; AsyncMock
# natively supports the async-context-manager protocol.
mock_pipe = AsyncMock()
mock_pipe.__aenter__.return_value = mock_pipe # `async with ... as pipe` -> mock_pipe
mock_pipe.execute.return_value = [0] # zcard result -> 0 removed
mock_client.pipeline = MagicMock(return_value=mock_pipe)
connection = broker.config.broker_config.connection
original_client = connection._client # noqa: SLF001
connection._client = mock_client # noqa: SLF001
Expand Down
13 changes: 13 additions & 0 deletions tests/test_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ async def test_fake_broker_fetch_redis_timers_returns_empty() -> None:
assert result == []


async def test_fake_broker_inspection_methods_report_no_pending() -> None:
broker = TimersBroker()

@broker.subscriber("topic")
async def handler(body: str) -> None: ...

async with TestTimersBroker(broker):
await broker.publish("hi", topic="topic", timer_id="t-1")
assert await broker.has_pending("topic", "t-1") is False
assert await broker.get_pending_timers("topic") == []
assert await broker.cancel_all("topic") == 0


# --- scheduled_timers inspection (U8) ---


Expand Down
Loading