From 1f44a3910d4a594d3bbe3d5fe917a3ebff3689fa Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 08:31:27 +0300 Subject: [PATCH] fix doc drift and misleading add_done_callback comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit README/CLAUDE.md: SIGQUIT was never wired up (only SIGTERM/SIGINT); shutdown timeout default is 20 s not 10 s; the in-flight task tracker is now a counter + asyncio.Event (set was dropped in 1aba826); the shutdown wait uses Event.wait() not asyncio.gather; KafkaBatchCommitter description rewritten for the streaming loop. Also corrected the DI middleware ordering claim — first registered is outermost in FastStream broker_middlewares (verified against faststream _internal/endpoint/subscriber). processing.py / batch_committer.py: comments claimed add_done_callback fires synchronously when the future is already done; per Python docs it always schedules via loop.call_soon. Reworded to describe the actual mechanism. Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 4 ++-- README.md | 16 ++++++++-------- .../batch_committer.py | 6 ++++-- faststream_concurrent_aiokafka/processing.py | 9 +++++---- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 408b3de..436498b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -28,8 +28,8 @@ The library provides concurrent Kafka message processing for FastStream. Modules **`processing.py` — `KafkaConcurrentHandler`** The core engine. One handler is created per `initialize_concurrent_processing` call and stored in FastStream's `ContextRepo` under the key `"concurrent_processing"`. It is *not* a singleton — calling `stop_concurrent_processing` clears the context entry so a fresh handler can be initialised. The handler manages: - An `asyncio.Semaphore` for concurrency limiting (minimum: 1) -- A set of in-flight `asyncio.Task`s, with a done-callback (`_finish_task`) that releases the semaphore and discards the task -- Signal handlers (SIGTERM/SIGINT/SIGQUIT) that trigger graceful shutdown +- In-flight task tracking via a counter (`_tracked_count`) + `asyncio.Event` (`_all_done_event`); the per-task done-callback (`_finish_task`) releases the semaphore, decrements the counter, and sets the event when it reaches zero. `wait_for_subtasks` awaits the event with a timeout +- Signal handlers (SIGTERM/SIGINT) that trigger graceful shutdown - A `KafkaBatchCommitter` for offset commits Key design: `handle_task()` fires-and-forgets the user coroutine as an asyncio task and enqueues a `KafkaCommitTask` on the committer. Offsets are not committed until the user task finishes (at-least-once semantics). diff --git a/README.md b/README.md index 1a36bfd..f6252e8 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,8 @@ By default FastStream processes Kafka messages sequentially — one message at a - Configurable concurrency limit (semaphore-based) - Batch offset committing per partition after each task completes - Rebalance-safe: pending offsets are flushed on partition revocation via `ConsumerRebalanceListener` -- Graceful shutdown: waits up to 10 s for in-flight tasks before exiting -- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown +- Graceful shutdown: waits up to `shutdown_timeout_sec` (default 20 s) for in-flight tasks before exiting +- Signal handling (SIGTERM / SIGINT) triggers graceful shutdown - Handler exceptions are logged but do not crash the consumer - Health check helper to probe handler status from a `ContextRepo` @@ -89,14 +89,14 @@ A FastStream `BaseMiddleware` subclass. Add it to your broker to enable concurre The processing engine. Manages: - An `asyncio.Semaphore` to enforce `concurrency_limit` -- A set of in-flight asyncio tasks (each task's done-callback releases the semaphore and discards the task) +- In-flight task tracking via a counter + `asyncio.Event` (each task's done-callback releases the semaphore, decrements the counter, and sets the event when it reaches zero) - A `KafkaBatchCommitter` for offset commits - Signal handlers for graceful shutdown - An optional `ConsumerRebalanceListener` (via `handler.create_rebalance_listener()`) that flushes pending commits when partitions are revoked ### KafkaBatchCommitter -Runs as a background asyncio task. Receives `KafkaCommitTask` objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, `CommitterIsDeadError` is raised to callers. +Runs as a background asyncio task. A streaming loop absorbs `KafkaCommitTask` objects into per-partition pending state and commits each partition's contiguous-done prefix when total pending crosses `commit_batch_size`, when `commit_batch_timeout_sec` fires, or when `commit_all`/`close` sets the flush event. Cancelled tasks are treated as a hard boundary — the offset advance stops at the cancelled task so it gets redelivered on restart (at-least-once). If the committer's task dies, `CommitterIsDeadError` is raised to callers. ## API Reference @@ -132,12 +132,12 @@ FastStream middleware class. Register it via `broker.add_middleware(...)`. See Q DI frameworks like `modern-di-faststream` register a broker-level middleware that creates a REQUEST-scoped dependency container around each message. If that middleware is **outer** to `KafkaConcurrentProcessingMiddleware`, its scope closes as soon as `consume_scope` returns — before the background task runs — so any dependencies resolved inside the task (database sessions, repositories, …) are created from an already-closed container. Their finalizers never run, leaving connections unreturned to the pool. -**Fix**: call `broker.add_middleware(KafkaConcurrentProcessingMiddleware)` **before** `setup_di(...)` (or any equivalent DI bootstrap call). FastStream stacks middlewares so the last registered is outermost; adding KCM first ensures the DI middleware ends up inside the background task where it can manage the scope lifetime correctly. +**Fix**: call `broker.add_middleware(KafkaConcurrentProcessingMiddleware)` **before** `setup_di(...)` (or any equivalent DI bootstrap call). FastStream stacks broker middlewares so the **first** registered is outermost; adding KCM first makes it wrap the DI middleware, so the DI middleware runs *inside* KCM's background task and can manage the scope lifetime correctly. ```python broker = KafkaBroker(...) -broker.add_middleware(KafkaConcurrentProcessingMiddleware) # must come first -modern_di_faststream.setup_di(app, container=container) # adds DI middleware after → inner to KCM +broker.add_middleware(KafkaConcurrentProcessingMiddleware) # registered first → outermost +modern_di_faststream.setup_di(app, container=container) # registered after → inner to KCM ``` ## How It Works @@ -150,7 +150,7 @@ modern_di_faststream.setup_di(app, container=container) # adds DI middleware 4. **Rebalance handling**: When Kafka revokes a partition, the `ConsumerRebalanceListener` (returned by `handler.create_rebalance_listener()`) calls `committer.commit_all()` to flush pending offsets before the partition is reassigned. This prevents in-flight messages from being redelivered to the new owner. -5. **Graceful shutdown**: `stop_concurrent_processing` flushes the committer, then awaits all in-flight tasks via `asyncio.gather` with a 10-second timeout, then removes the signal handlers. +5. **Graceful shutdown**: `stop_concurrent_processing` flushes the committer, then awaits all in-flight tasks via an `asyncio.Event` (set when the in-flight counter reaches zero) bounded by `shutdown_timeout_sec` (default 20 s), then removes the signal handlers. ## Requirements diff --git a/faststream_concurrent_aiokafka/batch_committer.py b/faststream_concurrent_aiokafka/batch_committer.py index 8846c5a..094d255 100644 --- a/faststream_concurrent_aiokafka/batch_committer.py +++ b/faststream_concurrent_aiokafka/batch_committer.py @@ -95,8 +95,10 @@ def _on_user_task_done(self, _task: asyncio.Future[typing.Any]) -> None: self._task_completed_event.set() def _track_user_task(self, ct: KafkaCommitTask) -> None: - # add_done_callback fires the callback synchronously if the future is already done, - # so a task that completed between create_task and absorb still triggers the wakeup. + # If the task is already done by the time we absorb it, add_done_callback still + # schedules _on_user_task_done via loop.call_soon — it fires on the next tick and + # wakes the streaming loop, so a task that completed between create_task and + # absorb still triggers the wakeup. ct.asyncio_task.add_done_callback(self._on_user_task_done) def _check_is_commit_task_running(self) -> None: diff --git a/faststream_concurrent_aiokafka/processing.py b/faststream_concurrent_aiokafka/processing.py index 3bd6f52..0612821 100644 --- a/faststream_concurrent_aiokafka/processing.py +++ b/faststream_concurrent_aiokafka/processing.py @@ -70,10 +70,11 @@ async def handle_task( ) -> None: await self._limiter.acquire() task: typing.Final = asyncio.ensure_future(coroutine) - # Increment + clear before add_done_callback. add_done_callback fires synchronously - # if the task is already done; that path then immediately decrements back to a - # consistent state. Reverse order would skew the count for a synchronously-finished - # task. + # Increment + clear before add_done_callback so the counter already reflects this + # task by the time _finish_task can run. add_done_callback always schedules via + # loop.call_soon (never synchronous), but the callback could fire on the very next + # tick — once we yield at the send_task await below — so the bookkeeping must be + # consistent before that point. self._tracked_count += 1 self._all_done_event.clear() task.add_done_callback(self._finish_task)