Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def __init__(self, client, subscriptions, **configs):
self._subscriptions = subscriptions
self._completed_fetches = collections.deque() # Unparsed responses
self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
self._paused_completed_fetches = {} # tp -> CompletedFetch (raw)
self._paused_partition_records = {} # tp -> PartitionRecords (parsed)
self._iterator = None
self._fetch_futures = collections.deque()
if self.config['metrics']:
Expand Down Expand Up @@ -474,17 +476,38 @@ def fetched_records(self, max_records=None, update_offsets=True):
fetched_partition = None
fetched_offset = -1

# KAFKA-7548: restore parked data for any partition that the user
# has since resumed. Raw completions go back into the fetch queue;
# parsed records take the in-line slot when free, otherwise stay
# parked and get picked up on a subsequent call.
for tp in list(self._paused_completed_fetches):
if not self._subscriptions.is_paused(tp):
self._completed_fetches.append(self._paused_completed_fetches.pop(tp))
if self._next_partition_records is None:
for tp in list(self._paused_partition_records):
if not self._subscriptions.is_paused(tp):
self._next_partition_records = self._paused_partition_records.pop(tp)
break

try:
while records_remaining > 0:
if not self._next_partition_records:
if not self._completed_fetches:
break
completion = self._completed_fetches.popleft()
if self._subscriptions.is_paused(completion.topic_partition):
self._paused_completed_fetches[completion.topic_partition] = completion
continue
fetched_partition = completion.topic_partition
fetched_offset = completion.fetched_offset
self._next_partition_records = self._parse_fetched_data(completion)
else:
fetched_partition = self._next_partition_records.topic_partition
tp = self._next_partition_records.topic_partition
if self._subscriptions.is_paused(tp):
self._paused_partition_records[tp] = self._next_partition_records
self._next_partition_records = None
continue
fetched_partition = tp
fetched_offset = self._next_partition_records.next_fetch_offset
records_remaining -= self._append(drained,
self._next_partition_records,
Expand Down Expand Up @@ -1110,6 +1133,8 @@ def _fetchable_partitions(self):
current = self._next_partition_records
if current:
discard.add(current.topic_partition)
discard.update(self._paused_completed_fetches)
discard.update(self._paused_partition_records)
return [tp for tp in fetchable if tp not in discard]

def _create_fetch_requests(self):
Expand Down Expand Up @@ -1368,6 +1393,10 @@ def _on_partition_records_drain(self, partition_records):
def close(self):
if self._next_partition_records is not None:
self._next_partition_records.drain()
for parked in self._paused_partition_records.values():
parked.drain()
self._paused_partition_records.clear()
self._paused_completed_fetches.clear()
self._next_in_line_exception_metadata = None

class PartitionRecords:
Expand Down
112 changes: 112 additions & 0 deletions test/consumer/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,118 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker):
assert partition_record is None


# KAFKA-7548: paused partitions should retain their prefetched data so
# resume() doesn't have to refetch from the broker.


def _paused_msgs(n):
return [(None, b'msg-%d' % i, None) for i in range(n)]


def test_fetched_records_parks_raw_completion_for_paused_partition(fetcher, topic):
"""A CompletedFetch for a paused partition is parked, not parsed/drained,
and is restored on the next fetched_records() once resumed."""
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)

fetcher._subscriptions.pause(tp)
fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(5)))

records, partial = fetcher.fetched_records()
assert records == {}
assert partial is False, 'paused parked data must not block other fetches'
assert tp in fetcher._paused_completed_fetches
assert not fetcher._completed_fetches

# Resume and pull again: parked completion gets restored, parsed, drained.
fetcher._subscriptions.resume(tp)
records, _ = fetcher.fetched_records()
assert tp in records
assert len(records[tp]) == 5
assert tp not in fetcher._paused_completed_fetches


def test_fetched_records_parks_parsed_records_on_pause_between_calls(fetcher, topic):
"""If a partition is paused between two fetched_records() calls while
parsed records still sit in _next_partition_records, the parsed records
are parked (not dropped) and returned on the next call after resume."""
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)

fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(10)))
# First call: pull 4 records, leave the rest in _next_partition_records.
records, _ = fetcher.fetched_records(max_records=4)
assert len(records[tp]) == 4
assert fetcher._next_partition_records is not None

# Pause between calls. Next call must park (not drain) the remainder.
fetcher._subscriptions.pause(tp)
records, partial = fetcher.fetched_records()
assert records == {}
assert partial is False
assert fetcher._next_partition_records is None
assert tp in fetcher._paused_partition_records

# Resume — remainder comes back on the next call.
fetcher._subscriptions.resume(tp)
records, _ = fetcher.fetched_records()
assert tp in records
assert len(records[tp]) == 6
assert tp not in fetcher._paused_partition_records


def test_fetched_records_other_partitions_progress_while_one_paused(fetcher, topic):
"""A paused partition with parked data must not block other partitions'
records from being returned in the same fetched_records() call."""
fetcher.config['check_crcs'] = False
tp_paused = TopicPartition(topic, 0)
tp_active = TopicPartition(topic, 1)

fetcher._subscriptions.pause(tp_paused)
fetcher._completed_fetches.append(_build_completed_fetch(tp_paused, _paused_msgs(5)))
fetcher._completed_fetches.append(_build_completed_fetch(tp_active, _paused_msgs(3)))

records, partial = fetcher.fetched_records()
assert tp_paused not in records
assert tp_active in records
assert len(records[tp_active]) == 3
assert tp_paused in fetcher._paused_completed_fetches
assert partial is False


def test_fetchable_partitions_excludes_parked_pause_data(fetcher, topic, mocker):
"""A partition with parked (paused) data must not be picked up by the
fetch-request side after resume() but before the next fetched_records()
drains the parked entry — otherwise we'd issue a redundant fetch."""
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)

fetcher._subscriptions.pause(tp)
fetcher._paused_completed_fetches[tp] = _build_completed_fetch(tp, _paused_msgs(5))
fetcher._subscriptions.resume(tp)

# Even though the partition is now resumed, parked data is pending —
# exclude from the fetchable set so we don't refetch.
assert tp not in fetcher._fetchable_partitions()


def test_close_drains_parked_paused_records(fetcher, topic):
"""close() must release any parked records so the test fixture doesn't
leak memory and the metric aggregators see their per-tp record."""
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)

fetcher._completed_fetches.append(_build_completed_fetch(tp, _paused_msgs(5)))
fetcher.fetched_records(max_records=2) # populates _next_partition_records
fetcher._subscriptions.pause(tp)
fetcher.fetched_records() # parks _next_partition_records
assert tp in fetcher._paused_partition_records

fetcher.close()
assert not fetcher._paused_partition_records
assert not fetcher._paused_completed_fetches


def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
Expand Down
Loading