Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ The library provides concurrent Kafka message processing for FastStream. Modules
**`processing.py` — `KafkaConcurrentHandler`**
The core engine. One handler is created per `initialize_concurrent_processing` call and stored in FastStream's `ContextRepo` under the key `"concurrent_processing"`. It is *not* a singleton — calling `stop_concurrent_processing` clears the context entry so a fresh handler can be initialised. The handler manages:
- An `asyncio.Semaphore` for concurrency limiting (minimum: 1)
- In-flight task tracking via a counter (`_tracked_count`) + `asyncio.Event` (`_all_done_event`); the per-task done-callback (`_finish_task`) releases the semaphore, decrements the counter, and sets the event when it reaches zero. `wait_for_subtasks` awaits the event with a timeout
- Signal handlers (SIGTERM/SIGINT) that trigger graceful shutdown
- A `set[asyncio.Task]` (`_tracked_tasks`) holding in-flight user tasks; the per-task done-callback (`_finish_task`) releases the semaphore and removes the task from the set
- A `KafkaBatchCommitter` for offset commits

Key design: `handle_task()` fires-and-forgets the user coroutine as an asyncio task and enqueues a `KafkaCommitTask` on the committer. Offsets are not committed until the user task finishes (at-least-once semantics).

`stop()` cancels every in-flight tracked task, then awaits `committer.close()`. The committer treats cancelled tasks as a hard offset boundary (see `batch_committer.py`), so cancelled-and-after offsets stay uncommitted and get redelivered on restart. Total wall-clock for shutdown is bounded by the committer's own `shutdown_timeout_sec` (default 20 s) and is sub-second in normal conditions. The handler does *not* install signal handlers — shutdown is driven by the FastStream lifespan calling `stop_concurrent_processing`.

**`middleware.py` — FastStream middleware + lifecycle functions**
- `KafkaConcurrentProcessingMiddleware`: FastStream `BaseMiddleware` subclass. Its `consume_scope` retrieves the handler from `self.context`. It passes through (a) FakeConsumer (TestKafkaBroker) and (b) any subscriber whose ack policy is not MANUAL (`kafka_message.committed is not None`). It refuses if `_enable_auto_commit=True` on the consumer. If the handler has been stopped, it logs a warning and skips the message (the offset stays uncommitted, so the message is redelivered on restart).
- `initialize_concurrent_processing(context, ...)`: create and start a handler, store it in context.
Expand Down
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ By default FastStream processes Kafka messages sequentially — one message at a
- Configurable concurrency limit (semaphore-based)
- Batch offset committing per partition after each task completes
- Rebalance-safe: pending offsets are flushed on partition revocation via `ConsumerRebalanceListener`
- Graceful shutdown: waits up to `shutdown_timeout_sec` (default 20 s) for in-flight tasks before exiting
- Signal handling (SIGTERM / SIGINT) triggers graceful shutdown
- Fast shutdown: cancels in-flight tasks; uncommitted offsets are redelivered on restart (at-least-once)
- Signal handling owned by your lifespan / process manager — this lib does not register SIGTERM/SIGINT handlers
- Handler exceptions are logged but do not crash the consumer
- Health check helper to probe handler status from a `ContextRepo`

Expand Down Expand Up @@ -110,13 +110,13 @@ Create and start the concurrent processing handler; store it in FastStream's con
| `concurrency_limit` | `10` | Max concurrent asyncio tasks (minimum: 1) |
| `commit_batch_size` | `10` | Max messages per commit batch |
| `commit_batch_timeout_sec` | `10.0` | Max seconds before flushing a batch |
| `shutdown_timeout_sec` | `20.0` | Max seconds to wait for the batch committer to flush AND for in-flight handlers to finish during graceful shutdown |
| `shutdown_timeout_sec` | `20.0` | Max seconds the batch committer waits for its background task to drain before forcing cancellation |

Returns the `KafkaConcurrentHandler` instance.

### `stop_concurrent_processing(context)`

Flush pending commits, wait for in-flight tasks (up to `shutdown_timeout_sec`), then stop the handler.
Cancel all in-flight handler tasks, flush completed offsets via the committer, then stop the handler. Uncommitted offsets (from cancelled tasks or anything queued past a cancelled offset) are redelivered on restart — at-least-once.

### `is_kafka_handler_healthy(context)`

Expand Down Expand Up @@ -150,7 +150,20 @@ modern_di_faststream.setup_di(app, container=container) # registered after

4. **Rebalance handling**: When Kafka revokes a partition, the `ConsumerRebalanceListener` (returned by `handler.create_rebalance_listener()`) calls `committer.commit_all()` to flush pending offsets before the partition is reassigned. This prevents in-flight messages from being redelivered to the new owner.

5. **Graceful shutdown**: `stop_concurrent_processing` flushes the committer, then awaits all in-flight tasks via an `asyncio.Event` (set when the in-flight counter reaches zero) bounded by `shutdown_timeout_sec` (default 20 s), then removes the signal handlers.
5. **Shutdown**: `stop_concurrent_processing` cancels every in-flight asyncio task, then awaits `committer.close()`. The committer treats cancelled tasks as a hard offset boundary — cancelled-and-after offsets stay uncommitted and get redelivered on restart. Total wall-clock is sub-second in normal conditions and bounded by `shutdown_timeout_sec` only as a safety net for stuck network commits.

## Migration from < 0.x

Previously, `stop_concurrent_processing` waited up to `2 × shutdown_timeout_sec` for in-flight handlers to drain to completion. The new behavior cancels them immediately. The at-least-once contract is unchanged — uncommitted offsets are redelivered on restart, the same way they always were when the handler crashed mid-task.

| What changed | Old | New |
|---|---|---|
| In-flight handler tasks on stop | drained to completion | **cancelled** |
| `KafkaConcurrentHandler.wait_for_subtasks()` | public method | removed |
| `shutdown_timeout_sec` | applied separately to handler and committer | applied to committer only |
| Signal handler installation | installed automatically | removed — own them via your lifespan / process manager |

If your handlers do non-idempotent work that's expensive to repeat, ensure your handlers are wrapped in `try/finally` so cleanup runs on `CancelledError`, or pin to the previous version of this library. To trigger shutdown on SIGTERM/SIGINT, your lifespan or main entry point must catch the signal and call `stop_concurrent_processing(broker.context)` — under uvicorn / AsgiFastStream this happens automatically through the lifespan `finally` block.

## Requirements

Expand Down
8 changes: 7 additions & 1 deletion faststream_concurrent_aiokafka/middleware.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import dataclasses
import logging
Expand Down Expand Up @@ -105,6 +106,12 @@ async def consume_scope( # ty: ignore[invalid-method-override]
# The user handler already fired; the offset stays uncommitted, so the message
# will be redelivered on restart (at-least-once).
logger.warning("Kafka middleware. Handler is shutting down, skipping message")
except asyncio.CancelledError:
# stop() cancelled this task while handle_task was awaiting send_task. Offset
# stays uncommitted → redelivered on restart. Propagate so FastStream's chain
# can run its own cleanup.
logger.warning("Kafka middleware. Task cancelled during shutdown")
raise
return None


Expand All @@ -127,7 +134,6 @@ async def initialize_concurrent_processing(
shutdown_timeout_sec=shutdown_timeout_sec,
),
concurrency_limit=concurrency_limit,
shutdown_timeout_sec=shutdown_timeout_sec,
)
await concurrent_processing.start()
context.set_global(_PROCESSING_CONTEXT_KEY, concurrent_processing)
Expand Down
64 changes: 12 additions & 52 deletions faststream_concurrent_aiokafka/processing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import functools
import logging
import signal
import typing

from faststream.kafka import ConsumerRecord, TopicPartition
Expand All @@ -15,7 +13,6 @@
logger = logging.getLogger(__name__)


SIGNALS: typing.Final = (signal.SIGTERM, signal.SIGINT)
DEFAULT_CONCURRENCY_LIMIT: typing.Final = 10
DEFAULT_SHUTDOWN_TIMEOUT_SEC: typing.Final = 20.0

Expand All @@ -25,42 +22,25 @@ def __init__(
self,
committer: KafkaBatchCommitter,
concurrency_limit: int = DEFAULT_CONCURRENCY_LIMIT,
shutdown_timeout_sec: float = DEFAULT_SHUTDOWN_TIMEOUT_SEC,
) -> None:
if concurrency_limit < 1:
msg = f"concurrency_limit must be >= 1, got {concurrency_limit}"
raise ValueError(msg)

self._limiter = asyncio.Semaphore(concurrency_limit)
# Counter + Event replace the old _current_tasks set: shutdown waits on the event,
# which is set once every tracked task has fired its done-callback.
self._tracked_count: int = 0
self._all_done_event: asyncio.Event = asyncio.Event()
self._all_done_event.set() # 0 tasks ⇒ "all done" is True
# Tracked only so stop() can cancel them. The committer is the source of truth for
# offset progress; this set just lets us reach in-flight tasks at shutdown.
self._tracked_tasks: set[asyncio.Task[typing.Any]] = set()
self._is_running: bool = False
self._committer: KafkaBatchCommitter = committer
self._stop_task: asyncio.Task[typing.Any] | None = None
self._shutdown_timeout_sec: float = shutdown_timeout_sec

async def wait_for_subtasks(self) -> None:
logger.info("Kafka middleware. Gracefully waiting for tasks to end...")
try:
await asyncio.wait_for(
self._all_done_event.wait(),
timeout=self._shutdown_timeout_sec,
)
except TimeoutError:
logger.exception("Kafka middleware. Whoops, some tasks haven't finished in graceful time, sorry")

def _finish_task(self, task: asyncio.Task[typing.Any]) -> None:
self._limiter.release()
self._tracked_tasks.discard(task)
if not task.cancelled():
exc: typing.Final[BaseException | None] = task.exception()
if exc:
logger.error("Kafka middleware. Task has failed with the exception", exc_info=exc)
self._tracked_count -= 1
if self._tracked_count == 0:
self._all_done_event.set()

async def handle_task(
self,
Expand All @@ -70,13 +50,7 @@ async def handle_task(
) -> None:
await self._limiter.acquire()
task: typing.Final = asyncio.ensure_future(coroutine)
# Increment + clear before add_done_callback so the counter already reflects this
# task by the time _finish_task can run. add_done_callback always schedules via
# loop.call_soon (never synchronous), but the callback could fire on the very next
# tick — once we yield at the send_task await below — so the bookkeeping must be
# consistent before that point.
self._tracked_count += 1
self._all_done_event.clear()
self._tracked_tasks.add(task)
task.add_done_callback(self._finish_task)
try:
await self._committer.send_task(
Expand All @@ -92,19 +66,6 @@ async def handle_task(
await self.stop()
raise

def _setup_signal_handlers(self) -> None:
loop: typing.Final = asyncio.get_running_loop()
for sig in SIGNALS:
loop.add_signal_handler(
sig,
functools.partial(self._signal_handler, sig),
)
logger.debug(f"Kafka middleware. Registered handler for {sig.name}")

def _signal_handler(self, sig: signal.Signals) -> None:
logger.info(f"Kafka middleware. Received signal {sig.name}, initiating graceful shutdown...")
self._stop_task = asyncio.create_task(self.stop())

async def start(self) -> None:
if self._is_running:
return
Expand All @@ -113,7 +74,6 @@ async def start(self) -> None:
self._is_running = True

self._committer.spawn()
self._setup_signal_handlers()
logger.info("Kafka middleware is ready to process messages.")

async def stop(self) -> None:
Expand All @@ -122,15 +82,15 @@ async def stop(self) -> None:
logger.info("Kafka middleware. Shutting down middleware handler")
self._is_running = False

# Cancel in-flight user tasks. The committer treats cancelled tasks as a hard
# offset boundary (batch_committer._extract_ready_prefixes / _map_offsets_per_partition):
# cancelled-and-after offsets stay uncommitted and get redelivered on restart.
for task in list(self._tracked_tasks):
if not task.done():
task.cancel()

await self._committer.close()
await self.wait_for_subtasks()

try:
loop = asyncio.get_running_loop()
for sig in SIGNALS:
loop.remove_signal_handler(sig)
except Exception: # noqa: BLE001
logger.warning("Kafka middleware. Exception raised while removing signal handlers", exc_info=True)
logger.info("Kafka middleware. Complete shutting down middleware handler")

def create_rebalance_listener(self) -> ConsumerRebalanceListener:
Expand Down
Loading
Loading