diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2a82e3e8d..85d5df394 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -124,6 +124,8 @@ def __init__(self, client, subscriptions, **configs): self._subscriptions = subscriptions self._completed_fetches = collections.deque() # Unparsed responses self._next_partition_records = None # Holds a single PartitionRecords until fully consumed + self._paused_completed_fetches = {} # tp -> CompletedFetch (raw) + self._paused_partition_records = {} # tp -> PartitionRecords (parsed) self._iterator = None self._fetch_futures = collections.deque() if self.config['metrics']: @@ -474,17 +476,38 @@ def fetched_records(self, max_records=None, update_offsets=True): fetched_partition = None fetched_offset = -1 + # KAFKA-7548: restore parked data for any partition that the user + # has since resumed. Raw completions go back into the fetch queue; + # parsed records take the in-line slot when free, otherwise stay + # parked and get picked up on a subsequent call. + for tp in list(self._paused_completed_fetches): + if not self._subscriptions.is_paused(tp): + self._completed_fetches.append(self._paused_completed_fetches.pop(tp)) + if self._next_partition_records is None: + for tp in list(self._paused_partition_records): + if not self._subscriptions.is_paused(tp): + self._next_partition_records = self._paused_partition_records.pop(tp) + break + try: while records_remaining > 0: if not self._next_partition_records: if not self._completed_fetches: break completion = self._completed_fetches.popleft() + if self._subscriptions.is_paused(completion.topic_partition): + self._paused_completed_fetches[completion.topic_partition] = completion + continue fetched_partition = completion.topic_partition fetched_offset = completion.fetched_offset self._next_partition_records = self._parse_fetched_data(completion) else: - fetched_partition = self._next_partition_records.topic_partition + tp = self._next_partition_records.topic_partition + if self._subscriptions.is_paused(tp): + self._paused_partition_records[tp] = self._next_partition_records + self._next_partition_records = None + continue + fetched_partition = tp fetched_offset = self._next_partition_records.next_fetch_offset records_remaining -= self._append(drained, self._next_partition_records, @@ -1110,6 +1133,8 @@ def _fetchable_partitions(self): current = self._next_partition_records if current: discard.add(current.topic_partition) + discard.update(self._paused_completed_fetches) + discard.update(self._paused_partition_records) return [tp for tp in fetchable if tp not in discard] def _create_fetch_requests(self): @@ -1368,6 +1393,10 @@ def _on_partition_records_drain(self, partition_records): def close(self): if self._next_partition_records is not None: self._next_partition_records.drain() + for parked in self._paused_partition_records.values(): + parked.drain() + self._paused_partition_records.clear() + self._paused_completed_fetches.clear() self._next_in_line_exception_metadata = None class PartitionRecords: diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 8af30098a..e6b3ddd68 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -657,6 +657,118 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker): assert partition_record is None +# KAFKA-7548: paused partitions should retain their prefetched data so +# resume() doesn't have to refetch from the broker. + + +def _paused_msgs(n): + return [(None, b'msg-%d' % i, None) for i in range(n)] + + +def test_fetched_records_parks_raw_completion_for_paused_partition(fetcher, topic): + """A CompletedFetch for a paused partition is parked, not parsed/drained, + and is restored on the next fetched_records() once resumed.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + + fetcher._subscriptions.pause(tp) + fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(5))) + + records, partial = fetcher.fetched_records() + assert records == {} + assert partial is False, 'paused parked data must not block other fetches' + assert tp in fetcher._paused_completed_fetches + assert not fetcher._completed_fetches + + # Resume and pull again: parked completion gets restored, parsed, drained. + fetcher._subscriptions.resume(tp) + records, _ = fetcher.fetched_records() + assert tp in records + assert len(records[tp]) == 5 + assert tp not in fetcher._paused_completed_fetches + + +def test_fetched_records_parks_parsed_records_on_pause_between_calls(fetcher, topic): + """If a partition is paused between two fetched_records() calls while + parsed records still sit in _next_partition_records, the parsed records + are parked (not dropped) and returned on the next call after resume.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + + fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(10))) + # First call: pull 4 records, leave the rest in _next_partition_records. + records, _ = fetcher.fetched_records(max_records=4) + assert len(records[tp]) == 4 + assert fetcher._next_partition_records is not None + + # Pause between calls. Next call must park (not drain) the remainder. + fetcher._subscriptions.pause(tp) + records, partial = fetcher.fetched_records() + assert records == {} + assert partial is False + assert fetcher._next_partition_records is None + assert tp in fetcher._paused_partition_records + + # Resume — remainder comes back on the next call. + fetcher._subscriptions.resume(tp) + records, _ = fetcher.fetched_records() + assert tp in records + assert len(records[tp]) == 6 + assert tp not in fetcher._paused_partition_records + + +def test_fetched_records_other_partitions_progress_while_one_paused(fetcher, topic): + """A paused partition with parked data must not block other partitions' + records from being returned in the same fetched_records() call.""" + fetcher.config['check_crcs'] = False + tp_paused = TopicPartition(topic, 0) + tp_active = TopicPartition(topic, 1) + + fetcher._subscriptions.pause(tp_paused) + fetcher._completed_fetches.append(_build_completed_fetch(tp_paused, _paused_msgs(5))) + fetcher._completed_fetches.append(_build_completed_fetch(tp_active, _paused_msgs(3))) + + records, partial = fetcher.fetched_records() + assert tp_paused not in records + assert tp_active in records + assert len(records[tp_active]) == 3 + assert tp_paused in fetcher._paused_completed_fetches + assert partial is False + + +def test_fetchable_partitions_excludes_parked_pause_data(fetcher, topic, mocker): + """A partition with parked (paused) data must not be picked up by the + fetch-request side after resume() but before the next fetched_records() + drains the parked entry — otherwise we'd issue a redundant fetch.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + + fetcher._subscriptions.pause(tp) + fetcher._paused_completed_fetches[tp] = _build_completed_fetch(tp, _paused_msgs(5)) + fetcher._subscriptions.resume(tp) + + # Even though the partition is now resumed, parked data is pending — + # exclude from the fetchable set so we don't refetch. + assert tp not in fetcher._fetchable_partitions() + + +def test_close_drains_parked_paused_records(fetcher, topic): + """close() must release any parked records so the test fixture doesn't + leak memory and the metric aggregators see their per-tp record.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + + fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(5))) + fetcher.fetched_records(max_records=2) # populates _next_partition_records + fetcher._subscriptions.pause(tp) + fetcher.fetched_records() # parks _next_partition_records + assert tp in fetcher._paused_partition_records + + fetcher.close() + assert not fetcher._paused_partition_records + assert not fetcher._paused_completed_fetches + + def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0)