From 3d85eeccf127c2f2961754e1102db04cf737bb88 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Tue, 5 May 2026 19:05:27 +0300 Subject: [PATCH] small committer cleanups - _extract_ready_prefixes returns (ready, count) so _maybe_commit doesn't re-sum partition lengths to update pending_count - drop dead `# type: ignore[misc]` from ConsumerRebalanceListener (mypy-era leftover; ty doesn't flag the class) - document _StreamingState invariants in a class docstring Co-Authored-By: Claude Opus 4.7 --- .../batch_committer.py | 25 +++++++++++++++---- faststream_concurrent_aiokafka/rebalance.py | 2 +- tests/test_kafka_committer.py | 12 ++++++--- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/faststream_concurrent_aiokafka/batch_committer.py b/faststream_concurrent_aiokafka/batch_committer.py index 3126462..575581a 100644 --- a/faststream_concurrent_aiokafka/batch_committer.py +++ b/faststream_concurrent_aiokafka/batch_committer.py @@ -35,6 +35,18 @@ class KafkaCommitTask: @dataclasses.dataclass(kw_only=True, slots=True) class _StreamingState: + """Mutable state for the streaming committer loop. + + Invariants maintained by `_streaming_iteration`: + * `pending_count == sum(len(v) for v in pending.values())`. + * `pending` empty ⇒ `timeout_deadline is None`. + * `flush_in_progress` is set only when a flush event fired *without* + `_stop_requested`; cleared once `pending` drains. + * `should_shutdown` is set only when a flush event fired *with* + `_stop_requested`; once set, the loop exits as soon as `pending` + drains. + """ + queue_get_task: asyncio.Task[KafkaCommitTask] flush_wait_task: asyncio.Task[bool] task_completed_wait_task: asyncio.Task[bool] @@ -177,15 +189,17 @@ def _map_offsets_per_partition( @staticmethod def _extract_ready_prefixes( pending: dict[TopicPartition, list[KafkaCommitTask]], - ) -> dict[TopicPartition, list[KafkaCommitTask]]: + ) -> tuple[dict[TopicPartition, list[KafkaCommitTask]], int]: # Pending lists are maintained in offset order by _insert_sorted. Per partition, find # the first not-done task; tasks before it form the contiguous-done prefix and become # "ready". A cancelled task is treated as a hard boundary: cancelled + everything after # is dropped from pending and added to ready (so task_done() balances # messages_queue.join), while _map_offsets_per_partition stops the offset advance at # the cancelled task so the uncommitted offsets get redelivered on restart - # (at-least-once). + # (at-least-once). Returns (ready, count) so the caller can update its cached + # pending_count without re-summing list lengths. ready: dict[TopicPartition, list[KafkaCommitTask]] = {} + ready_count = 0 empty_partitions: list[TopicPartition] = [] for partition, partition_pending in pending.items(): prefix_end = 0 @@ -200,13 +214,14 @@ def _extract_ready_prefixes( if prefix_end > 0: ready[partition] = partition_pending[:prefix_end] + ready_count += prefix_end del partition_pending[:prefix_end] if not partition_pending: empty_partitions.append(partition) for k in empty_partitions: del pending[k] - return ready + return ready, ready_count async def _commit_partitions(self, ready: dict[TopicPartition, list[KafkaCommitTask]]) -> bool: # Task exception logging is handled by the handler's _finish_task done-callback so @@ -337,9 +352,9 @@ async def _maybe_commit( ) if not commit_triggered: return {} - ready: typing.Final = self._extract_ready_prefixes(state.pending) + ready, ready_count = self._extract_ready_prefixes(state.pending) if ready: - state.pending_count -= sum(len(v) for v in ready.values()) + state.pending_count -= ready_count await self._commit_partitions(ready) return ready diff --git a/faststream_concurrent_aiokafka/rebalance.py b/faststream_concurrent_aiokafka/rebalance.py index b5d2290..291a7fd 100644 --- a/faststream_concurrent_aiokafka/rebalance.py +++ b/faststream_concurrent_aiokafka/rebalance.py @@ -9,7 +9,7 @@ from faststream.kafka import TopicPartition -class ConsumerRebalanceListener(BaseConsumerRebalanceListener): # type: ignore[misc] +class ConsumerRebalanceListener(BaseConsumerRebalanceListener): """Commits all pending offsets when Kafka revokes partitions during rebalance. Without this listener, in-flight message tasks whose offsets have not yet been diff --git a/tests/test_kafka_committer.py b/tests/test_kafka_committer.py index a7abd41..ea62a75 100644 --- a/tests/test_kafka_committer.py +++ b/tests/test_kafka_committer.py @@ -501,8 +501,9 @@ def test_committer_map_offsets_advances_to_max_per_partition(mock_consumer: Mock def test_extract_ready_prefixes_empty_pending() -> None: pending: dict[TopicPartition, list[KafkaCommitTask]] = {} - ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending) + ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending) assert ready == {} + assert ready_count == 0 assert pending == {} @@ -519,9 +520,10 @@ def test_extract_ready_prefixes_all_done(mock_consumer: MockAIOKafkaConsumer) -> ] pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)} - ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending) + ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending) assert ready == {tp: tasks} + assert ready_count == 3 assert pending == {} # partition emptied @@ -550,9 +552,10 @@ def test_extract_ready_prefixes_blocks_on_first_pending(mock_consumer: MockAIOKa ] pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)} - ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending) + ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending) assert ready == {tp: [tasks[0]]} # only the prefix before offset 11 + assert ready_count == 1 assert pending[tp] == [tasks[1], tasks[2]] @@ -585,9 +588,10 @@ def test_extract_ready_prefixes_cancelled_drops_partition(mock_consumer: MockAIO ] pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)} - ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending) + ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending) assert ready == {tp: tasks} # all three included in ready + assert ready_count == 3 assert pending == {} # partition emptied