diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 7eb828c2f..1b00a2492 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -109,7 +109,7 @@ def __init__(self, client, subscription, **configs): self.config['default_offset_commit_callback'] = self._default_offset_commit_callback if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 9): + if self._use_group_apis: if not self.config['assignors']: raise Errors.KafkaConfigurationError('Coordinator requires assignors') if self.config['api_version'] < (0, 10, 1): @@ -119,7 +119,7 @@ def __init__(self, client, subscription, **configs): "and session_timeout_ms") if self.config['enable_auto_commit']: - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: log.warning('Broker version (%s) does not support offset' ' commits; disabling auto-commit.', self.config['api_version']) @@ -143,6 +143,10 @@ def __init__(self, client, subscription, **configs): self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + @property + def _use_offset_apis(self): + return self.config['api_version'] >= (0, 8, 1) + def protocol_type(self): return ConsumerProtocolType @@ -202,7 +206,7 @@ def _handle_metadata_update(self, cluster): def _auto_assign_all_partitions(self): # For users that use "subscribe" without group support, # we will simply assign all partitions to this consumer - if self.config['api_version'] < (0, 9): + if not self._use_group_apis: return True elif self.config['group_id'] is None: return True @@ -276,7 +280,7 @@ def poll(self, timeout_ms=None): log.debug('coordinator.poll: timeout in ensure_coordinator_ready; returning early') return False - if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if self._use_group_apis and self._subscription.partitions_auto_assigned(): if self.need_rejoin(): # due to a race condition between the initial metadata fetch and the # initial rebalance, we need to ensure that the metadata is fresh @@ -537,7 +541,7 @@ def commit_offsets_async(self, offsets, callback=None): return future def _do_commit_offsets_async(self, offsets, callback=None): - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -563,7 +567,7 @@ def commit_offsets_sync(self, offsets, timeout_ms=None): Raises error on failure """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -634,7 +638,7 @@ def _send_offset_commit_request(self, offsets): Returns: Future: indicating whether the commit was successful or not """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -652,8 +656,7 @@ def _send_offset_commit_request(self, offsets): for tp, offset in offsets.items(): offset_data[tp.topic][tp.partition] = offset - version = self._client.api_version(OffsetCommitRequest, max_version=7) - if version > 1 and self._subscription.partitions_auto_assigned(): + if self._use_group_apis and self._subscription.partitions_auto_assigned(): generation = self.generation_if_stable() else: generation = Generation.NO_GENERATION @@ -676,95 +679,28 @@ def _send_offset_commit_request(self, offsets): " consumer is not part of an active group for auto partition assignment; it is likely that the consumer" " was kicked out of the group.")) - if version == 0: - request = OffsetCommitRequest[version]( - self.group_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version == 1: - request = OffsetCommitRequest[version]( - self.group_id, - # This api version was only used in v0.8.2, prior to join group apis - # so this always ends up as NO_GENERATION - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - -1, # timestamp, unused - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 4: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - -1, # default retention time - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 5: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - elif version <= 6: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - [( - topic, [( - partition, - offset.offset, - offset.leader_epoch, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) - else: - request = OffsetCommitRequest[version]( - self.group_id, - generation.generation_id, - generation.member_id, - self.group_instance_id, - [( - topic, [( - partition, - offset.offset, - offset.leader_epoch, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] - ) + _Topic = OffsetCommitRequest.OffsetCommitRequestTopic + _Partition = _Topic.OffsetCommitRequestPartition + request = OffsetCommitRequest( + group_id=self.group_id, + generation_id_or_member_epoch=generation.generation_id, + member_id=generation.member_id, + group_instance_id=self.group_instance_id, + topics=[_Topic( + name=topic, partitions=[_Partition( + partition_index=partition, + committed_offset=offset.offset, + committed_leader_epoch=offset.leader_epoch, + committed_metadata=offset.metadata + ) for partition, offset in partitions.items()] + ) for topic, partitions in offset_data.items()] + ) log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) future = Future() - _f = self._client.send(node_id, request) + _f = self._manager.send(request, node_id=node_id) _f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic()) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -863,10 +799,10 @@ def _send_offset_fetch_request(self, partitions): Returns: Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ - if self.config['api_version'] < (0, 8, 1): + if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) - if not partitions: + if not partitions and partitions is not None: return Future().success({}) node_id = self.coordinator() @@ -874,23 +810,29 @@ def _send_offset_fetch_request(self, partitions): return Future().failure(Errors.CoordinatorNotAvailableError) log.debug("Group %s fetching committed offsets for partitions: %s", - self.group_id, partitions) + self.group_id, '(all)' if partitions is None else partitions) # construct the request - topic_partitions = collections.defaultdict(set) - for tp in partitions: - topic_partitions[tp.topic].add(tp.partition) - - version = self._client.api_version(OffsetFetchRequest, max_version=5) - # Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched - # TODO: support - request = OffsetFetchRequest[version]( - self.group_id, - list(topic_partitions.items()) + max_version = 7 + if partitions is not None: + topic_partitions = collections.defaultdict(set) + for tp in partitions: + topic_partitions[tp.topic].add(tp.partition) + topic_partitions = list(topic_partitions.items()) + min_version = 0 + else: + topic_partitions = None + min_version = 2 + + request = OffsetFetchRequest( + group_id=self.group_id, + topics=topic_partitions, + min_version=min_version, + max_version=max_version, ) # send the request with a callback future = Future() - _f = self._client.send(node_id, request) + _f = self._manager.send(request, node_id=node_id) _f.add_callback(self._handle_offset_fetch_response, future) _f.add_errback(self._failed_request, node_id, request, future) return future