Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ The library provides concurrent Kafka message processing for FastStream. Modules
**`processing.py` — `KafkaConcurrentHandler`**
The core engine. One handler is created per `initialize_concurrent_processing` call and stored in FastStream's `ContextRepo` under the key `"concurrent_processing"`. It is *not* a singleton — calling `stop_concurrent_processing` clears the context entry so a fresh handler can be initialised. The handler manages:
- An `asyncio.Semaphore` for concurrency limiting (minimum: 1)
- A set of in-flight `asyncio.Task`s, with a done-callback (`_finish_task`) that releases the semaphore and discards the task
- Signal handlers (SIGTERM/SIGINT/SIGQUIT) that trigger graceful shutdown
- In-flight task tracking via a counter (`_tracked_count`) + `asyncio.Event` (`_all_done_event`); the per-task done-callback (`_finish_task`) releases the semaphore, decrements the counter, and sets the event when it reaches zero. `wait_for_subtasks` awaits the event with a timeout
- Signal handlers (SIGTERM/SIGINT) that trigger graceful shutdown
- A `KafkaBatchCommitter` for offset commits

Key design: `handle_task()` fires-and-forgets the user coroutine as an asyncio task and enqueues a `KafkaCommitTask` on the committer. Offsets are not committed until the user task finishes (at-least-once semantics).
Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ By default FastStream processes Kafka messages sequentially — one message at a
- Configurable concurrency limit (semaphore-based)
- Batch offset committing per partition after each task completes
- Rebalance-safe: pending offsets are flushed on partition revocation via `ConsumerRebalanceListener`
- Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
- Graceful shutdown: waits up to `shutdown_timeout_sec` (default 20 s) for in-flight tasks before exiting
- Signal handling (SIGTERM / SIGINT) triggers graceful shutdown
- Handler exceptions are logged but do not crash the consumer
- Health check helper to probe handler status from a `ContextRepo`

Expand Down Expand Up @@ -89,14 +89,14 @@ A FastStream `BaseMiddleware` subclass. Add it to your broker to enable concurre

The processing engine. Manages:
- An `asyncio.Semaphore` to enforce `concurrency_limit`
- A set of in-flight asyncio tasks (each task's done-callback releases the semaphore and discards the task)
- In-flight task tracking via a counter + `asyncio.Event` (each task's done-callback releases the semaphore, decrements the counter, and sets the event when it reaches zero)
- A `KafkaBatchCommitter` for offset commits
- Signal handlers for graceful shutdown
- An optional `ConsumerRebalanceListener` (via `handler.create_rebalance_listener()`) that flushes pending commits when partitions are revoked

### KafkaBatchCommitter

Runs as a background asyncio task. Receives `KafkaCommitTask` objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, `CommitterIsDeadError` is raised to callers.
Runs as a background asyncio task. A streaming loop absorbs `KafkaCommitTask` objects into per-partition pending state and commits each partition's contiguous-done prefix when total pending crosses `commit_batch_size`, when `commit_batch_timeout_sec` fires, or when `commit_all`/`close` sets the flush event. Cancelled tasks are treated as a hard boundary — the offset advance stops at the cancelled task so it gets redelivered on restart (at-least-once). If the committer's task dies, `CommitterIsDeadError` is raised to callers.

## API Reference

Expand Down Expand Up @@ -132,12 +132,12 @@ FastStream middleware class. Register it via `broker.add_middleware(...)`. See Q

DI frameworks like `modern-di-faststream` register a broker-level middleware that creates a REQUEST-scoped dependency container around each message. If that middleware is **outer** to `KafkaConcurrentProcessingMiddleware`, its scope closes as soon as `consume_scope` returns — before the background task runs — so any dependencies resolved inside the task (database sessions, repositories, …) are created from an already-closed container. Their finalizers never run, leaving connections unreturned to the pool.

**Fix**: call `broker.add_middleware(KafkaConcurrentProcessingMiddleware)` **before** `setup_di(...)` (or any equivalent DI bootstrap call). FastStream stacks middlewares so the last registered is outermost; adding KCM first ensures the DI middleware ends up inside the background task where it can manage the scope lifetime correctly.
**Fix**: call `broker.add_middleware(KafkaConcurrentProcessingMiddleware)` **before** `setup_di(...)` (or any equivalent DI bootstrap call). FastStream stacks broker middlewares so the **first** registered is outermost; adding KCM first makes it wrap the DI middleware, so the DI middleware runs *inside* KCM's background task and can manage the scope lifetime correctly.

```python
broker = KafkaBroker(...)
broker.add_middleware(KafkaConcurrentProcessingMiddleware) # must come first
modern_di_faststream.setup_di(app, container=container) # adds DI middleware after → inner to KCM
broker.add_middleware(KafkaConcurrentProcessingMiddleware) # registered first → outermost
modern_di_faststream.setup_di(app, container=container) # registered after → inner to KCM
```

## How It Works
Expand All @@ -150,7 +150,7 @@ modern_di_faststream.setup_di(app, container=container) # adds DI middleware

4. **Rebalance handling**: When Kafka revokes a partition, the `ConsumerRebalanceListener` (returned by `handler.create_rebalance_listener()`) calls `committer.commit_all()` to flush pending offsets before the partition is reassigned. This prevents in-flight messages from being redelivered to the new owner.

5. **Graceful shutdown**: `stop_concurrent_processing` flushes the committer, then awaits all in-flight tasks via `asyncio.gather` with a 10-second timeout, then removes the signal handlers.
5. **Graceful shutdown**: `stop_concurrent_processing` flushes the committer, then awaits all in-flight tasks via an `asyncio.Event` (set when the in-flight counter reaches zero) bounded by `shutdown_timeout_sec` (default 20 s), then removes the signal handlers.

## Requirements

Expand Down
6 changes: 4 additions & 2 deletions faststream_concurrent_aiokafka/batch_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def _on_user_task_done(self, _task: asyncio.Future[typing.Any]) -> None:
self._task_completed_event.set()

def _track_user_task(self, ct: KafkaCommitTask) -> None:
# add_done_callback fires the callback synchronously if the future is already done,
# so a task that completed between create_task and absorb still triggers the wakeup.
# If the task is already done by the time we absorb it, add_done_callback still
# schedules _on_user_task_done via loop.call_soon — it fires on the next tick and
# wakes the streaming loop, so a task that completed between create_task and
# absorb still triggers the wakeup.
ct.asyncio_task.add_done_callback(self._on_user_task_done)

def _check_is_commit_task_running(self) -> None:
Expand Down
9 changes: 5 additions & 4 deletions faststream_concurrent_aiokafka/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ async def handle_task(
) -> None:
await self._limiter.acquire()
task: typing.Final = asyncio.ensure_future(coroutine)
# Increment + clear before add_done_callback. add_done_callback fires synchronously
# if the task is already done; that path then immediately decrements back to a
# consistent state. Reverse order would skew the count for a synchronously-finished
# task.
# Increment + clear before add_done_callback so the counter already reflects this
# task by the time _finish_task can run. add_done_callback always schedules via
# loop.call_soon (never synchronous), but the callback could fire on the very next
# tick — once we yield at the send_task await below — so the bookkeeping must be
# consistent before that point.
self._tracked_count += 1
self._all_done_event.clear()
task.add_done_callback(self._finish_task)
Expand Down
Loading