Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def send_fetches(self):
Returns:
List of Futures: each future resolves to a FetchResponse
"""
return self._manager.run(self._send_fetches_async)

async def _send_fetches_async(self):
futures = []
for node_id, (request, fetch_offsets) in self._create_fetch_requests().items():
log.debug("Sending FetchRequest to node %s", node_id)
Expand Down
36 changes: 20 additions & 16 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def lookup_coordinator(self):
# then _reset_find_coordinator_future will immediately fire and
# set _find_coordinator_future = None
# To avoid returning None, we capture the future in a local variable
future = self._send_group_coordinator_request()
future = self._manager.call_soon(self._send_group_coordinator_request)
self._find_coordinator_future = future
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
return future
Expand Down Expand Up @@ -721,29 +721,34 @@ async def _do_join_and_sync_async(self):
sync_request, node_id=self.coordinator_id)
return self._process_sync_group_response(sync_response, sync_send_time)

def _send_group_coordinator_request(self):
async def _send_group_coordinator_request(self):
"""Discover the current coordinator for the group.

Returns:
Future: resolves to the node id of the coordinator
node_id of the coordinator on success.
Raises:
NodeNotReadyError if no broker is currently connectable.
Coordinator-related errors (see _handle_find_coordinator_response).
"""
node_id = self._client.least_loaded_node()
if node_id is None:
return Future().failure(Errors.NodeNotReadyError('coordinator'))
raise Errors.NodeNotReadyError('coordinator')

max_version = 3
request = FindCoordinatorRequest(
key=self.group_id,
max_version=max_version)
log.debug("Sending group coordinator request for group %s to broker %s: %s",
self.group_id, node_id, request)
future = Future()
_f = self._manager.send(request, node_id=node_id)
_f.add_callback(self._handle_find_coordinator_response, future)
_f.add_errback(self._failed_request, node_id, request, future)
return future

def _handle_find_coordinator_response(self, future, response):
try:
response = await self._manager.send(request, node_id=node_id)
except Exception as exc:
self._failed_request(node_id, request, None, exc)
raise
return self._handle_find_coordinator_response(response)

def _handle_find_coordinator_response(self, response):
log.debug("Received find coordinator response %s", response)

error_type = Errors.for_code(response.error_code)
Expand All @@ -753,28 +758,27 @@ def _handle_find_coordinator_response(self, future, response):
if not coordinator_id:
# This could happen if coordinator metadata is different
# than broker metadata
future.failure(Errors.IllegalStateError())
return
raise Errors.IllegalStateError()

self.coordinator_id = coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
self._client.maybe_connect(self.coordinator_id)
self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)
return self.coordinator_id

elif error_type is Errors.CoordinatorNotAvailableError:
log.debug("Group Coordinator Not Available; retry")
future.failure(error_type())
raise error_type()
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
log.error("Group Coordinator Request failed: %s", error)
future.failure(error)
raise error
else:
error = error_type()
log.error("Group Coordinator lookup for group %s failed: %s",
self.group_id, error)
future.failure(error)
raise error

def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
Expand Down
Loading
Loading