Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 49 additions & 107 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, client, subscription, **configs):
self.config['default_offset_commit_callback'] = self._default_offset_commit_callback

if self.config['group_id'] is not None:
if self.config['api_version'] >= (0, 9):
if self._use_group_apis:
if not self.config['assignors']:
raise Errors.KafkaConfigurationError('Coordinator requires assignors')
if self.config['api_version'] < (0, 10, 1):
Expand All @@ -119,7 +119,7 @@ def __init__(self, client, subscription, **configs):
"and session_timeout_ms")

if self.config['enable_auto_commit']:
if self.config['api_version'] < (0, 8, 1):
if not self._use_offset_apis:
log.warning('Broker version (%s) does not support offset'
' commits; disabling auto-commit.',
self.config['api_version'])
Expand All @@ -143,6 +143,10 @@ def __init__(self, client, subscription, **configs):
self._cluster.request_update()
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))

@property
def _use_offset_apis(self):
return self.config['api_version'] >= (0, 8, 1)

def protocol_type(self):
return ConsumerProtocolType

Expand Down Expand Up @@ -202,7 +206,7 @@ def _handle_metadata_update(self, cluster):
def _auto_assign_all_partitions(self):
# For users that use "subscribe" without group support,
# we will simply assign all partitions to this consumer
if self.config['api_version'] < (0, 9):
if not self._use_group_apis:
return True
elif self.config['group_id'] is None:
return True
Expand Down Expand Up @@ -276,7 +280,7 @@ def poll(self, timeout_ms=None):
log.debug('coordinator.poll: timeout in ensure_coordinator_ready; returning early')
return False

if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
if self._use_group_apis and self._subscription.partitions_auto_assigned():
if self.need_rejoin():
# due to a race condition between the initial metadata fetch and the
# initial rebalance, we need to ensure that the metadata is fresh
Expand Down Expand Up @@ -537,7 +541,7 @@ def commit_offsets_async(self, offsets, callback=None):
return future

def _do_commit_offsets_async(self, offsets, callback=None):
if self.config['api_version'] < (0, 8, 1):
if not self._use_offset_apis:
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
Expand All @@ -563,7 +567,7 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):

Raises error on failure
"""
if self.config['api_version'] < (0, 8, 1):
if not self._use_offset_apis:
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
Expand Down Expand Up @@ -634,7 +638,7 @@ def _send_offset_commit_request(self, offsets):
Returns:
Future: indicating whether the commit was successful or not
"""
if self.config['api_version'] < (0, 8, 1):
if not self._use_offset_apis:
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
Expand All @@ -652,8 +656,7 @@ def _send_offset_commit_request(self, offsets):
for tp, offset in offsets.items():
offset_data[tp.topic][tp.partition] = offset

version = self._client.api_version(OffsetCommitRequest, max_version=7)
if version > 1 and self._subscription.partitions_auto_assigned():
if self._use_group_apis and self._subscription.partitions_auto_assigned():
generation = self.generation_if_stable()
else:
generation = Generation.NO_GENERATION
Expand All @@ -676,95 +679,28 @@ def _send_offset_commit_request(self, offsets):
" consumer is not part of an active group for auto partition assignment; it is likely that the consumer"
" was kicked out of the group."))

if version == 0:
request = OffsetCommitRequest[version](
self.group_id,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
elif version == 1:
request = OffsetCommitRequest[version](
self.group_id,
# This api version was only used in v0.8.2, prior to join group apis
# so this always ends up as NO_GENERATION
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
-1, # timestamp, unused
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
elif version <= 4:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
generation.member_id,
-1, # default retention time
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
elif version <= 5:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
elif version <= 6:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
offset.leader_epoch,
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
else:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
generation.member_id,
self.group_instance_id,
[(
topic, [(
partition,
offset.offset,
offset.leader_epoch,
offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
_Topic = OffsetCommitRequest.OffsetCommitRequestTopic
_Partition = _Topic.OffsetCommitRequestPartition
request = OffsetCommitRequest(
group_id=self.group_id,
generation_id_or_member_epoch=generation.generation_id,
member_id=generation.member_id,
group_instance_id=self.group_instance_id,
topics=[_Topic(
name=topic, partitions=[_Partition(
partition_index=partition,
committed_offset=offset.offset,
committed_leader_epoch=offset.leader_epoch,
committed_metadata=offset.metadata
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)

log.debug("Sending offset-commit request with %s for group %s to %s",
offsets, self.group_id, node_id)

future = Future()
_f = self._client.send(node_id, request)
_f = self._manager.send(request, node_id=node_id)
_f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic())
_f.add_errback(self._failed_request, node_id, request, future)
return future
Expand Down Expand Up @@ -863,34 +799,40 @@ def _send_offset_fetch_request(self, partitions):
Returns:
Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
"""
if self.config['api_version'] < (0, 8, 1):
if not self._use_offset_apis:
raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker')
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
if not partitions:
if not partitions and partitions is not None:
return Future().success({})

node_id = self.coordinator()
if node_id is None:
return Future().failure(Errors.CoordinatorNotAvailableError)

log.debug("Group %s fetching committed offsets for partitions: %s",
self.group_id, partitions)
self.group_id, '(all)' if partitions is None else partitions)
# construct the request
topic_partitions = collections.defaultdict(set)
for tp in partitions:
topic_partitions[tp.topic].add(tp.partition)

version = self._client.api_version(OffsetFetchRequest, max_version=5)
# Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched
# TODO: support
request = OffsetFetchRequest[version](
self.group_id,
list(topic_partitions.items())
max_version = 7
if partitions is not None:
topic_partitions = collections.defaultdict(set)
for tp in partitions:
topic_partitions[tp.topic].add(tp.partition)
topic_partitions = list(topic_partitions.items())
min_version = 0
else:
topic_partitions = None
min_version = 2

request = OffsetFetchRequest(
group_id=self.group_id,
topics=topic_partitions,
min_version=min_version,
max_version=max_version,
)

# send the request with a callback
future = Future()
_f = self._client.send(node_id, request)
_f = self._manager.send(request, node_id=node_id)
_f.add_callback(self._handle_offset_fetch_response, future)
_f.add_errback(self._failed_request, node_id, request, future)
return future
Expand Down
Loading