Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions faststream_concurrent_aiokafka/batch_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ class KafkaCommitTask:

@dataclasses.dataclass(kw_only=True, slots=True)
class _StreamingState:
"""Mutable state for the streaming committer loop.

Invariants maintained by `_streaming_iteration`:
* `pending_count == sum(len(v) for v in pending.values())`.
* `pending` empty ⇒ `timeout_deadline is None`.
* `flush_in_progress` is set only when a flush event fired *without*
`_stop_requested`; cleared once `pending` drains.
* `should_shutdown` is set only when a flush event fired *with*
`_stop_requested`; once set, the loop exits as soon as `pending`
drains.
"""

queue_get_task: asyncio.Task[KafkaCommitTask]
flush_wait_task: asyncio.Task[bool]
task_completed_wait_task: asyncio.Task[bool]
Expand Down Expand Up @@ -177,15 +189,17 @@ def _map_offsets_per_partition(
@staticmethod
def _extract_ready_prefixes(
pending: dict[TopicPartition, list[KafkaCommitTask]],
) -> dict[TopicPartition, list[KafkaCommitTask]]:
) -> tuple[dict[TopicPartition, list[KafkaCommitTask]], int]:
# Pending lists are maintained in offset order by _insert_sorted. Per partition, find
# the first not-done task; tasks before it form the contiguous-done prefix and become
# "ready". A cancelled task is treated as a hard boundary: cancelled + everything after
# is dropped from pending and added to ready (so task_done() balances
# messages_queue.join), while _map_offsets_per_partition stops the offset advance at
# the cancelled task so the uncommitted offsets get redelivered on restart
# (at-least-once).
# (at-least-once). Returns (ready, count) so the caller can update its cached
# pending_count without re-summing list lengths.
ready: dict[TopicPartition, list[KafkaCommitTask]] = {}
ready_count = 0
empty_partitions: list[TopicPartition] = []
for partition, partition_pending in pending.items():
prefix_end = 0
Expand All @@ -200,13 +214,14 @@ def _extract_ready_prefixes(

if prefix_end > 0:
ready[partition] = partition_pending[:prefix_end]
ready_count += prefix_end
del partition_pending[:prefix_end]
if not partition_pending:
empty_partitions.append(partition)

for k in empty_partitions:
del pending[k]
return ready
return ready, ready_count

async def _commit_partitions(self, ready: dict[TopicPartition, list[KafkaCommitTask]]) -> bool:
# Task exception logging is handled by the handler's _finish_task done-callback so
Expand Down Expand Up @@ -337,9 +352,9 @@ async def _maybe_commit(
)
if not commit_triggered:
return {}
ready: typing.Final = self._extract_ready_prefixes(state.pending)
ready, ready_count = self._extract_ready_prefixes(state.pending)
if ready:
state.pending_count -= sum(len(v) for v in ready.values())
state.pending_count -= ready_count
await self._commit_partitions(ready)
return ready

Expand Down
2 changes: 1 addition & 1 deletion faststream_concurrent_aiokafka/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from faststream.kafka import TopicPartition


class ConsumerRebalanceListener(BaseConsumerRebalanceListener): # type: ignore[misc]
class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
"""Commits all pending offsets when Kafka revokes partitions during rebalance.

Without this listener, in-flight message tasks whose offsets have not yet been
Expand Down
12 changes: 8 additions & 4 deletions tests/test_kafka_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,9 @@ def test_committer_map_offsets_advances_to_max_per_partition(mock_consumer: Mock

def test_extract_ready_prefixes_empty_pending() -> None:
pending: dict[TopicPartition, list[KafkaCommitTask]] = {}
ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending)
ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending)
assert ready == {}
assert ready_count == 0
assert pending == {}


Expand All @@ -519,9 +520,10 @@ def test_extract_ready_prefixes_all_done(mock_consumer: MockAIOKafkaConsumer) ->
]
pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)}

ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending)
ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending)

assert ready == {tp: tasks}
assert ready_count == 3
assert pending == {} # partition emptied


Expand Down Expand Up @@ -550,9 +552,10 @@ def test_extract_ready_prefixes_blocks_on_first_pending(mock_consumer: MockAIOKa
]
pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)}

ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending)
ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending)

assert ready == {tp: [tasks[0]]} # only the prefix before offset 11
assert ready_count == 1
assert pending[tp] == [tasks[1], tasks[2]]


Expand Down Expand Up @@ -585,9 +588,10 @@ def test_extract_ready_prefixes_cancelled_drops_partition(mock_consumer: MockAIO
]
pending: dict[TopicPartition, list[KafkaCommitTask]] = {tp: list(tasks)}

ready: typing.Final = KafkaBatchCommitter._extract_ready_prefixes(pending)
ready, ready_count = KafkaBatchCommitter._extract_ready_prefixes(pending)

assert ready == {tp: tasks} # all three included in ready
assert ready_count == 3
assert pending == {} # partition emptied


Expand Down
Loading