From ceca8011f007f9ba931dc981e3d5e2c1e9ac1a88 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:21:23 -0700 Subject: [PATCH 1/6] _use_group_apis --- kafka/coordinator/consumer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 7eb828c2f..d932d5ddd 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -109,7 +109,7 @@ def __init__(self, client, subscription, **configs): self.config['default_offset_commit_callback'] = self._default_offset_commit_callback if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 9): + if self._use_group_apis: if not self.config['assignors']: raise Errors.KafkaConfigurationError('Coordinator requires assignors') if self.config['api_version'] < (0, 10, 1): @@ -202,7 +202,7 @@ def _handle_metadata_update(self, cluster): def _auto_assign_all_partitions(self): # For users that use "subscribe" without group support, # we will simply assign all partitions to this consumer - if self.config['api_version'] < (0, 9): + if not self._use_group_apis: return True elif self.config['group_id'] is None: return True @@ -276,7 +276,7 @@ def poll(self, timeout_ms=None): log.debug('coordinator.poll: timeout in ensure_coordinator_ready; returning early') return False - if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if self._use_group_apis and self._subscription.partitions_auto_assigned(): if self.need_rejoin(): # due to a race condition between the initial metadata fetch and the # initial rebalance, we need to ensure that the metadata is fresh @@ -653,7 +653,7 @@ def _send_offset_commit_request(self, offsets): offset_data[tp.topic][tp.partition] = offset version = self._client.api_version(OffsetCommitRequest, max_version=7) - if version > 1 and self._subscription.partitions_auto_assigned(): + if self._use_group_apis and self._subscription.partitions_auto_assigned(): generation = self.generation_if_stable() else: generation = Generation.NO_GENERATION From 5d4728fcd595eea8f2dff75eb5944a1e704d5bb5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:21:50 -0700 Subject: [PATCH 2/6] _use_offset_apis --- kafka/coordinator/consumer.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d932d5ddd..ea1ab34ef 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -119,7 +119,7 @@ def __init__(self, client, subscription, **configs): "and session_timeout_ms") if self.config['enable_auto_commit']: - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: log.warning('Broker version (%s) does not support offset' ' commits; disabling auto-commit.', self.config['api_version']) @@ -143,6 +143,10 @@ def __init__(self, client, subscription, **configs): self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + @property + def _use_offset_apis(self): + return self.config['api_version'] >= (0, 8, 1) + def protocol_type(self): return ConsumerProtocolType @@ -537,7 +541,7 @@ def commit_offsets_async(self, offsets, callback=None): return future def _do_commit_offsets_async(self, offsets, callback=None): - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -563,7 +567,7 @@ def commit_offsets_sync(self, offsets, timeout_ms=None): Raises error on failure """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -634,7 +638,7 @@ def _send_offset_commit_request(self, offsets): Returns: Future: indicating whether the commit was successful or not """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -863,7 +867,7 @@ def _send_offset_fetch_request(self, partitions): Returns: Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions: From 5d8311a8a75e49a25d44f4b88d1320f35bfa04c0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:22:41 -0700 Subject: [PATCH 3/6] simplify OffsetCommitRequest construction --- kafka/coordinator/consumer.py | 100 ++++++---------------------------- 1 file changed, 16 insertions(+), 84 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ea1ab34ef..af2243b3f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -656,7 +656,6 @@ def _send_offset_commit_request(self, offsets): for tp, offset in offsets.items(): offset_data[tp.topic][tp.partition] = offset - version = self._client.api_version(OffsetCommitRequest, max_version=7) if self._use_group_apis and self._subscription.partitions_auto_assigned(): generation = self.generation_if_stable() else: @@ -680,89 +679,22 @@ def _send_offset_commit_request(self, offsets): " consumer is not part of an active group for auto partition assignment; it is likely that the consumer" " was kicked out of the group.")) - if version == 0: - request = OffsetCommitRequest[version]( - self.group_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version == 1: - request = OffsetCommitRequest[version]( - self.group_id, - # This api version was only used in v0.8.2, prior to join group apis - # so this always ends up as NO_GENERATION - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - -1, # timestamp, unused - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 4: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - -1, # default retention time - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 5: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 6: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - offset.leader_epoch, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - else: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - self.group_instance_id, - [( - topic, [( - partition, - offset.offset, - offset.leader_epoch, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) + _Topic = OffsetCommitRequest.OffsetCommitRequestTopic + _Partition = _Topic.OffsetCommitRequestPartition + request = OffsetCommitRequest( + group_id=self.group_id, + generation_id_or_member_epoch=generation.generation_id, + member_id=generation.member_id, + group_instance_id=self.group_instance_id, + topics=[_Topic( + name=topic, partitions=[_Partition( + partition_index=partition, + committed_offset=offset.offset, + committed_leader_epoch=offset.leader_epoch, + committed_metadata=offset.metadata + ) for partition, offset in partitions.items()] + ) for topic, partitions in offset_data.items()] + ) log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) From 26d4cbba3fd8220041366e05b58bfc571b36e85a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:25:34 -0700 Subject: [PATCH 4/6] simplify OffsetFetchRequest construction --- kafka/coordinator/consumer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index af2243b3f..502b4e286 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -816,12 +816,13 @@ def _send_offset_fetch_request(self, partitions): for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - version = self._client.api_version(OffsetFetchRequest, max_version=5) # Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched # TODO: support - request = OffsetFetchRequest[version]( - self.group_id, - list(topic_partitions.items()) + max_version = 7 + request = OffsetFetchRequest( + group_id=self.group_id, + topics=list(topic_partitions.items()), + max_version=max_version, ) # send the request with a callback From 9a1c405969cc30dfbd6037fbf946bd22eeb10878 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:26:11 -0700 Subject: [PATCH 5/6] Support offset fetch request for all topics via None --- kafka/coordinator/consumer.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 502b4e286..af89019cf 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -802,7 +802,7 @@ def _send_offset_fetch_request(self, partitions): if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) - if not partitions: + if not partitions and partitions is not None: return Future().success({}) node_id = self.coordinator() @@ -810,18 +810,23 @@ def _send_offset_fetch_request(self, partitions): return Future().failure(Errors.CoordinatorNotAvailableError) log.debug("Group %s fetching committed offsets for partitions: %s", - self.group_id, partitions) + self.group_id, '(all)' if partitions is None else partitions) # construct the request - topic_partitions = collections.defaultdict(set) - for tp in partitions: - topic_partitions[tp.topic].add(tp.partition) - - # Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched - # TODO: support max_version = 7 + if partitions is not None: + topic_partitions = collections.defaultdict(set) + for tp in partitions: + topic_partitions[tp.topic].add(tp.partition) + topic_partitions = list(topic_partitions.items()) + min_version = 0 + else: + topic_partitions = None + min_version = 2 + request = OffsetFetchRequest( group_id=self.group_id, - topics=list(topic_partitions.items()), + topics=topic_partitions, + min_version=min_version, max_version=max_version, ) From 5c65edcb4609b809776908dde7c4e5ad777629f8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 May 2026 08:26:20 -0700 Subject: [PATCH 6/6] client.send -> manager.send --- kafka/coordinator/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index af89019cf..1b00a2492 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -700,7 +700,7 @@ def _send_offset_commit_request(self, offsets): offsets, self.group_id, node_id) future = Future() - _f = self._client.send(node_id, request) + _f = self._manager.send(request, node_id=node_id) _f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic()) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -832,7 +832,7 @@ def _send_offset_fetch_request(self, partitions): # send the request with a callback future = Future() - _f = self._client.send(node_id, request) + _f = self._manager.send(request, node_id=node_id) _f.add_callback(self._handle_offset_fetch_response, future) _f.add_errback(self._failed_request, node_id, request, future) return future