From 898e9ad5ab293e241130077f5b74d7cfa30ea36f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 19 May 2026 14:44:41 -0700 Subject: [PATCH] Consumer: send all requests from net io thread --- kafka/consumer/fetcher.py | 3 + kafka/coordinator/base.py | 36 ++++---- kafka/coordinator/consumer.py | 140 +++++++++++++--------------- test/consumer/test_coordinator.py | 148 ++++++++++++++++++------------ 4 files changed, 180 insertions(+), 147 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1beac96f9..2a82e3e8d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -217,6 +217,9 @@ def send_fetches(self): Returns: List of Futures: each future resolves to a FetchResponse """ + return self._manager.run(self._send_fetches_async) + + async def _send_fetches_async(self): futures = [] for node_id, (request, fetch_offsets) in self._create_fetch_requests().items(): log.debug("Sending FetchRequest to node %s", node_id) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 7c70de333..1d310bb73 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -348,7 +348,7 @@ def lookup_coordinator(self): # then _reset_find_coordinator_future will immediately fire and # set _find_coordinator_future = None # To avoid returning None, we capture the future in a local variable - future = self._send_group_coordinator_request() + future = self._manager.call_soon(self._send_group_coordinator_request) self._find_coordinator_future = future self._find_coordinator_future.add_both(self._reset_find_coordinator_future) return future @@ -721,15 +721,18 @@ async def _do_join_and_sync_async(self): sync_request, node_id=self.coordinator_id) return self._process_sync_group_response(sync_response, sync_send_time) - def _send_group_coordinator_request(self): + async def _send_group_coordinator_request(self): """Discover the current coordinator for the group. Returns: - Future: resolves to the node id of the coordinator + node_id of the coordinator on success. + Raises: + NodeNotReadyError if no broker is currently connectable. + Coordinator-related errors (see _handle_find_coordinator_response). """ node_id = self._client.least_loaded_node() if node_id is None: - return Future().failure(Errors.NodeNotReadyError('coordinator')) + raise Errors.NodeNotReadyError('coordinator') max_version = 3 request = FindCoordinatorRequest( @@ -737,13 +740,15 @@ def _send_group_coordinator_request(self): max_version=max_version) log.debug("Sending group coordinator request for group %s to broker %s: %s", self.group_id, node_id, request) - future = Future() - _f = self._manager.send(request, node_id=node_id) - _f.add_callback(self._handle_find_coordinator_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future - def _handle_find_coordinator_response(self, future, response): + try: + response = await self._manager.send(request, node_id=node_id) + except Exception as exc: + self._failed_request(node_id, request, None, exc) + raise + return self._handle_find_coordinator_response(response) + + def _handle_find_coordinator_response(self, response): log.debug("Received find coordinator response %s", response) error_type = Errors.for_code(response.error_code) @@ -753,28 +758,27 @@ def _handle_find_coordinator_response(self, future, response): if not coordinator_id: # This could happen if coordinator metadata is different # than broker metadata - future.failure(Errors.IllegalStateError()) - return + raise Errors.IllegalStateError() self.coordinator_id = coordinator_id log.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) self._client.maybe_connect(self.coordinator_id) self.heartbeat.reset_timeouts() - future.success(self.coordinator_id) + return self.coordinator_id elif error_type is Errors.CoordinatorNotAvailableError: log.debug("Group Coordinator Not Available; retry") - future.failure(error_type()) + raise error_type() elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) log.error("Group Coordinator Request failed: %s", error) - future.failure(error) + raise error else: error = error_type() log.error("Group Coordinator lookup for group %s failed: %s", self.group_id, error) - future.failure(error) + raise error def coordinator_dead(self, error): """Mark the current coordinator as dead.""" diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 163250f34..5d93cedd8 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -504,7 +504,7 @@ async def fetch_committed_offsets_async(self, partitions, timeout_ms=None): if future_key in self._offset_fetch_futures: future = self._offset_fetch_futures[future_key] else: - future = self._send_offset_fetch_request(partitions) + future = self._manager.call_soon(self._send_offset_fetch_request, partitions) self._offset_fetch_futures[future_key] = future try: @@ -594,7 +594,7 @@ def _do_commit_offsets_async(self, offsets, callback=None): offsets.values())) if callback is None: callback = self.config['default_offset_commit_callback'] - future = self._send_offset_commit_request(offsets) + future = self._manager.call_soon(self._send_offset_commit_request, offsets) future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) def _maybe_set_async_commit_fenced(exc): if isinstance(exc, Errors.FencedInstanceIdError): @@ -631,7 +631,7 @@ async def _commit_offsets_sync_async(self, offsets, timeout_ms=None): while True: await self.ensure_coordinator_ready_async(timeout_ms=timer.timeout_ms) - future = self._send_offset_commit_request(offsets) + future = self._manager.call_soon(self._send_offset_commit_request, offsets) try: await self._manager.wait_for(future, timer.timeout_ms) except Errors.KafkaTimeoutError: @@ -672,19 +672,21 @@ def _maybe_auto_commit_offsets_sync(self, timeout_ms=None): log.exception("Offset commit failed: This is likely to cause" " duplicate message delivery") - def _send_offset_commit_request(self, offsets): + async def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. - This is a non-blocking call which returns a request future that can be - polled in the case of a synchronous commit or ignored in the - asynchronous case. - Arguments: offsets (dict of {TopicPartition: OffsetAndMetadata}): what should - be committed + be committed. - Returns: - Future: indicating whether the commit was successful or not + Returns: None on success. + Raises: + UnsupportedVersionError if broker is too old. + CoordinatorNotAvailableError if the coordinator is unknown. + RebalanceInProgressError / CommitFailedError if generation is + not stable. + Other broker-side OffsetCommit errors propagated via + _handle_offset_commit_response. """ if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') @@ -693,11 +695,11 @@ def _send_offset_commit_request(self, offsets): offsets.values())) if not offsets: log.debug('No offsets to commit') - return Future().success(None) + return None node_id = self.coordinator() if node_id is None: - return Future().failure(Errors.CoordinatorNotAvailableError) + raise Errors.CoordinatorNotAvailableError() # create the offset commit request offset_data = collections.defaultdict(dict) @@ -717,15 +719,15 @@ def _send_offset_commit_request(self, offsets): if self.rebalance_in_progress(): # if the client knows it is already rebalancing, we can use RebalanceInProgressError instead of # CommitFailedError to indicate this is not a fatal error - return Future().failure(Errors.RebalanceInProgressError( + raise Errors.RebalanceInProgressError( "Offset commit cannot be completed since the" " consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance" - " by calling poll() and then retry the operation.")) + " by calling poll() and then retry the operation.") else: - return Future().failure(Errors.CommitFailedError( + raise Errors.CommitFailedError( "Offset commit cannot be completed since the" " consumer is not part of an active group for auto partition assignment; it is likely that the consumer" - " was kicked out of the group.")) + " was kicked out of the group.") _Topic = OffsetCommitRequest.OffsetCommitRequestTopic _Partition = _Topic.OffsetCommitRequestPartition @@ -747,13 +749,15 @@ def _send_offset_commit_request(self, offsets): log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) - future = Future() - _f = self._manager.send(request, node_id=node_id) - _f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic()) - _f.add_errback(self._failed_request, node_id, request, future) - return future + send_time = time.monotonic() + try: + response = await self._manager.send(request, node_id=node_id) + except Exception as exc: + self._failed_request(node_id, request, None, exc) + raise + self._handle_offset_commit_response(offsets, send_time, response) - def _handle_offset_commit_response(self, offsets, future, send_time, response): + def _handle_offset_commit_response(self, offsets, send_time, response): log.debug("Received OffsetCommitResponse: %s", response) # TODO look at adding request_latency_ms to response (like java kafka) if self._consumer_sensors: @@ -772,8 +776,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): elif error_type is Errors.GroupAuthorizationFailedError: log.error("Not authorized to commit offsets for group %s", self.group_id) - future.failure(error_type(self.group_id)) - return + raise error_type(self.group_id) elif error_type is Errors.TopicAuthorizationFailedError: unauthorized_topics.add(topic) elif error_type in (Errors.OffsetMetadataTooLargeError, @@ -781,22 +784,19 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): # raise the error to the user log.debug("OffsetCommit for group %s failed on partition %s" " %s", self.group_id, tp, error_type.__name__) - future.failure(error_type()) - return + raise error_type() elif error_type is Errors.CoordinatorLoadInProgressError: # just retry log.debug("OffsetCommit for group %s failed: %s", self.group_id, error_type.__name__) - future.failure(error_type(self.group_id)) - return + raise error_type(self.group_id) elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError): log.debug("OffsetCommit for group %s failed: %s", self.group_id, error_type.__name__) self.coordinator_dead(error_type()) - future.failure(error_type(self.group_id)) - return + raise error_type(self.group_id) elif error_type is Errors.RebalanceInProgressError: # Consumer never tries to commit offset in between join-group and sync-group, # and hence on broker-side it is not expected to see a commit offset request @@ -805,13 +805,11 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): # However, we do not need to reset generations and just request re-join, such that # if the caller decides to proceed and poll, it would still try to proceed and re-join normally. self.request_rejoin() - future.failure(Errors.CommitFailedError(error_type())) - return + raise Errors.CommitFailedError(error_type()) elif error_type is Errors.FencedInstanceIdError: log.error("OffsetCommit for group %s failed due to fenced id error: %s", self.group_id, self.group_instance_id) - future.failure(error_type()) - return + raise error_type() elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): # need reset generation and re-join group @@ -819,43 +817,42 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): log.warning("OffsetCommit for group %s failed: %s", self.group_id, error) self.reset_generation() - future.failure(Errors.CommitFailedError(error_type())) - return + raise Errors.CommitFailedError(error_type()) else: log.error("Group %s failed to commit partition %s at offset" " %s: %s", self.group_id, tp, offset, error_type.__name__) - future.failure(error_type()) - return + raise error_type() if unauthorized_topics: log.error("Not authorized to commit to topics %s for group %s", unauthorized_topics, self.group_id) - future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) - else: - future.success(None) + raise Errors.TopicAuthorizationFailedError(unauthorized_topics) - def _send_offset_fetch_request(self, partitions): + async def _send_offset_fetch_request(self, partitions): """Fetch the committed offsets for a set of partitions. - This is a non-blocking call. The returned future can be polled to get - the actual offsets returned from the broker. - Arguments: - partitions (list of TopicPartition): the partitions to fetch + partitions (list of TopicPartition): the partitions to fetch. Returns: - Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} + dict {TopicPartition: OffsetAndMetadata} on success. + + Raises: + UnsupportedVersionError if broker is too old. + CoordinatorNotAvailableError if the coordinator is unknown. + Other broker-side OffsetFetch errors propagated via + _handle_offset_fetch_response. """ if not self._use_offset_apis: raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions and partitions is not None: - return Future().success({}) + return {} node_id = self.coordinator() if node_id is None: - return Future().failure(Errors.CoordinatorNotAvailableError) + raise Errors.CoordinatorNotAvailableError() log.debug("Group %s fetching committed offsets for partitions: %s", self.group_id, '(all)' if partitions is None else partitions) @@ -878,32 +875,29 @@ def _send_offset_fetch_request(self, partitions): max_version=max_version, ) - # send the request with a callback - future = Future() - _f = self._manager.send(request, node_id=node_id) - _f.add_callback(self._handle_offset_fetch_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future + try: + response = await self._manager.send(request, node_id=node_id) + except Exception as exc: + self._failed_request(node_id, request, None, exc) + raise + return self._handle_offset_fetch_response(response) - def _handle_offset_fetch_response(self, future, response): + def _handle_offset_fetch_response(self, response): log.debug("Received OffsetFetchResponse: %s", response) if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno: error_type = Errors.for_code(response.error_code) log.debug("Offset fetch failed: %s", error_type.__name__) error = error_type() - if error_type is Errors.CoordinatorLoadInProgressError: - # Retry - future.failure(error) - elif error_type is Errors.NotCoordinatorError: + if error_type is Errors.NotCoordinatorError: # re-discover the coordinator and retry self.coordinator_dead(error) - future.failure(error) - elif error_type is Errors.GroupAuthorizationFailedError: - future.failure(error) + raise error + elif error_type in (Errors.CoordinatorLoadInProgressError, + Errors.GroupAuthorizationFailedError): + raise error else: log.error("Unknown error fetching offsets: %s", error) - future.failure(error) - return + raise error offsets = {} for topic, partitions in response.topics: @@ -920,13 +914,12 @@ def _handle_offset_fetch_response(self, future, response): error = error_type() log.debug("Group %s failed to fetch offset for partition" " %s: %s", self.group_id, tp, error) - if error_type is Errors.CoordinatorLoadInProgressError: - # just retry - future.failure(error) - elif error_type is Errors.NotCoordinatorError: + if error_type is Errors.NotCoordinatorError: # re-discover the coordinator and retry self.coordinator_dead(error) - future.failure(error) + raise error + elif error_type is Errors.CoordinatorLoadInProgressError: + raise error elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("OffsetFetchRequest -- unknown topic %s" " (have you committed any offsets yet?)", @@ -935,8 +928,7 @@ def _handle_offset_fetch_response(self, future, response): else: log.error("Unknown error fetching offsets for %s: %s", tp, error) - future.failure(error) - return + raise error elif offset >= 0: # record the position with the offset # (-1 indicates no committed offset to fetch) @@ -944,7 +936,7 @@ def _handle_offset_fetch_response(self, future, response): else: log.debug("Group %s has no committed offset for partition" " %s", self.group_id, tp) - future.success(offsets) + return offsets def _default_offset_commit_callback(self, offsets, res_or_exc): if isinstance(res_or_exc, Exception): diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 1fbd7c15c..27b1d6621 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -379,15 +379,21 @@ def test_fetch_committed_offsets(mocker, coordinator): async def _ready(*args, **kwargs): return True mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready) + # _send_offset_fetch_request is an async coroutine; scheduled via + # manager.call_soon so the mock must also be a coroutine function. + async def fake_send_success(_partitions): + return 'foobar' mocker.patch.object(coordinator, '_send_offset_fetch_request', - return_value=Future().success('foobar')) + side_effect=fake_send_success) partitions = [TopicPartition('foobar', 0)] ret = coordinator.fetch_committed_offsets(partitions) assert ret == 'foobar' coordinator._send_offset_fetch_request.assert_called_with(partitions) - # Failed future is raised if not retriable - coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) + # Non-retriable error is raised + async def fake_send_assertion_error(_partitions): + raise AssertionError + coordinator._send_offset_fetch_request.side_effect = fake_send_assertion_error try: coordinator.fetch_committed_offsets(partitions) except AssertionError: @@ -395,9 +401,13 @@ async def _ready(*args, **kwargs): else: assert False, 'Exception not raised when expected' - coordinator._send_offset_fetch_request.side_effect = [ - Future().failure(Errors.RequestTimedOutError), - Future().success('fizzbuzz')] + # Retriable error then success + async def fake_send_retry_then_success(_partitions): + if not hasattr(fake_send_retry_then_success, 'called'): + fake_send_retry_then_success.called = True + raise Errors.RequestTimedOutError + return 'fizzbuzz' + coordinator._send_offset_fetch_request.side_effect = fake_send_retry_then_success ret = coordinator.fetch_committed_offsets(partitions) assert ret == 'fizzbuzz' @@ -434,12 +444,16 @@ def offsets(): def test_commit_offsets_async(mocker, coordinator, offsets): - mocker.patch.object(coordinator._client, 'poll') mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) mocker.patch.object(coordinator, 'ensure_coordinator_ready') + # _send_offset_commit_request is an async coroutine; scheduled via + # manager.call_soon so we drive the event loop to trigger the mock. + async def fake_send(_offsets): + return 'fizzbuzz' mocker.patch.object(coordinator, '_send_offset_commit_request', - return_value=Future().success('fizzbuzz')) - coordinator.commit_offsets_async(offsets) + side_effect=fake_send) + future = coordinator.commit_offsets_async(offsets) + coordinator._client.poll(future=future, timeout_ms=1000) assert coordinator._send_offset_commit_request.call_count == 1 @@ -447,8 +461,11 @@ def test_commit_offsets_sync(mocker, coordinator, offsets): async def _ready(*args, **kwargs): return True mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready) + + async def fake_send_success(_offsets): + return 'fizzbuzz' mocker.patch.object(coordinator, '_send_offset_commit_request', - return_value=Future().success('fizzbuzz')) + side_effect=fake_send_success) # No offsets, no calls assert coordinator.commit_offsets_sync({}) is None @@ -458,8 +475,10 @@ async def _ready(*args, **kwargs): assert coordinator._send_offset_commit_request.call_count == 1 assert ret == 'fizzbuzz' - # Failed future is raised if not retriable - coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) + # Non-retriable error is raised + async def fake_send_assertion_error(_offsets): + raise AssertionError + coordinator._send_offset_commit_request.side_effect = fake_send_assertion_error try: coordinator.commit_offsets_sync(offsets) except AssertionError: @@ -467,9 +486,13 @@ async def _ready(*args, **kwargs): else: assert False, 'Exception not raised when expected' - coordinator._send_offset_commit_request.side_effect = [ - Future().failure(Errors.RequestTimedOutError), - Future().success('fizzbuzz')] + # Retriable error is retried, then success + async def fake_send_retry_then_success(_offsets): + if not hasattr(fake_send_retry_then_success, 'called'): + fake_send_retry_then_success.called = True + raise Errors.RequestTimedOutError + return 'fizzbuzz' + coordinator._send_offset_commit_request.side_effect = fake_send_retry_then_success ret = coordinator.commit_offsets_sync(offsets) assert ret == 'fizzbuzz' @@ -532,18 +555,16 @@ def seeded_coord(broker, coordinator): def test_send_offset_commit_request_fail(coordinator, offsets): - # Default coordinator state has coordinator_id=None, so coordinator() - # returns None and the early-return paths fire without any patching. + # _send_offset_commit_request is an async coroutine; run it via the + # selector. Default coordinator state has coordinator_id=None, so + # coordinator() returns None and the no-coordinator path fires. - # No offsets - ret = coordinator._send_offset_commit_request({}) - assert isinstance(ret, Future) - assert ret.succeeded() + # No offsets — coroutine returns None + assert coordinator._net.run(coordinator._send_offset_commit_request, {}) is None - # No coordinator - ret = coordinator._send_offset_commit_request(offsets) - assert ret.failed() - assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) + # No coordinator — coroutine raises + with pytest.raises(Errors.CoordinatorNotAvailableError): + coordinator._net.run(coordinator._send_offset_commit_request, offsets) @pytest.mark.parametrize('broker,version', [ @@ -569,7 +590,8 @@ def handler(api_key, api_version, correlation_id, request_bytes): ])]) broker.respond_fn(OffsetCommitRequest, handler) - future = seeded_coord._send_offset_commit_request(offsets) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_commit_request, offsets) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.succeeded() assert captured['api_version'] == version @@ -580,16 +602,21 @@ def test_send_offset_commit_request_failure(mocker, broker, seeded_coord, offset error = Errors.KafkaConnectionError('simulated transport failure') broker.fail_next(OffsetCommitRequest, error=error) - future = seeded_coord._send_offset_commit_request(offsets) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_commit_request, offsets) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.failed() assert future.exception is error + # The async coro raises on send failure, so the call_soon Future + # carries the exception. _failed_request still fires for its side + # effect (mark coordinator dead); future arg is None since the + # coroutine itself surfaces the exception. assert spy.call_count == 1 node_id, request, call_future, call_error = spy.call_args[0] assert node_id == 0 assert isinstance(request, OffsetCommitRequest) - assert call_future is future + assert call_future is None assert call_error is error @@ -604,14 +631,14 @@ def test_send_offset_commit_request_success(mocker, broker, seeded_coord, offset ])])) spy = mocker.spy(seeded_coord, '_handle_offset_commit_response') - future = seeded_coord._send_offset_commit_request(offsets) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_commit_request, offsets) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.succeeded() assert spy.call_count == 1 - call_offsets, call_future, _send_time, response = spy.call_args[0] + call_offsets, _send_time, response = spy.call_args[0] assert call_offsets == offsets - assert call_future is future assert isinstance(response, OffsetCommitResponse) @@ -657,9 +684,11 @@ def test_send_offset_commit_request_success(mocker, broker, seeded_coord, offset ]) def test_handle_offset_commit_response(coordinator, offsets, response, error, dead): coordinator.coordinator_id = 0 - future = Future() - coordinator._handle_offset_commit_response(offsets, future, time.monotonic(), response) - assert isinstance(future.exception, error) if error else True + if error is None: + coordinator._handle_offset_commit_response(offsets, time.monotonic(), response) + else: + with pytest.raises(error): + coordinator._handle_offset_commit_response(offsets, time.monotonic(), response) assert coordinator.coordinator_id is (None if dead else 0) @@ -669,18 +698,15 @@ def partitions(): def test_send_offset_fetch_request_fail(coordinator, partitions): - # Default coordinator state has coordinator_id=None. + # _send_offset_fetch_request is an async coroutine; run it via the + # selector. Default coordinator state has coordinator_id=None. - # No partitions - ret = coordinator._send_offset_fetch_request([]) - assert isinstance(ret, Future) - assert ret.succeeded() - assert ret.value == {} + # No partitions — coroutine returns {} + assert coordinator._net.run(coordinator._send_offset_fetch_request, []) == {} - # No coordinator - ret = coordinator._send_offset_fetch_request(partitions) - assert ret.failed() - assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) + # No coordinator — coroutine raises + with pytest.raises(Errors.CoordinatorNotAvailableError): + coordinator._net.run(coordinator._send_offset_fetch_request, partitions) @pytest.mark.parametrize('broker,version', [ @@ -710,7 +736,8 @@ def handler(api_key, api_version, correlation_id, request_bytes): ])]) broker.respond_fn(OffsetFetchRequest, handler) - future = seeded_coord._send_offset_fetch_request(partitions) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_fetch_request, partitions) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.succeeded() assert captured['api_version'] == version @@ -721,16 +748,20 @@ def test_send_offset_fetch_request_failure(mocker, broker, seeded_coord, partiti error = Errors.KafkaConnectionError('simulated transport failure') broker.fail_next(OffsetFetchRequest, error=error) - future = seeded_coord._send_offset_fetch_request(partitions) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_fetch_request, partitions) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.failed() assert future.exception is error + # The async coro raises on send failure; the call_soon Future carries + # the exception. _failed_request still fires for its side effect + # (mark coordinator dead) with future=None. assert spy.call_count == 1 node_id, request, call_future, call_error = spy.call_args[0] assert node_id == 0 assert isinstance(request, OffsetFetchRequest) - assert call_future is future + assert call_future is None assert call_error is error @@ -748,14 +779,14 @@ def test_send_offset_fetch_request_success(mocker, broker, seeded_coord, partiti ])])) spy = mocker.spy(seeded_coord, '_handle_offset_fetch_response') - future = seeded_coord._send_offset_fetch_request(partitions) + future = seeded_coord._manager.call_soon( + seeded_coord._send_offset_fetch_request, partitions) seeded_coord._client.poll(future=future, timeout_ms=5000) assert future.succeeded() assert future.value == offsets assert spy.call_count == 1 - call_future, response = spy.call_args[0] - assert call_future is future + (response,) = spy.call_args[0] assert isinstance(response, OffsetFetchResponse) @@ -785,13 +816,11 @@ def test_send_offset_fetch_request_success(mocker, broker, seeded_coord, partiti ]) def test_handle_offset_fetch_response(coordinator, offsets, response, error, dead): coordinator.coordinator_id = 0 - future = Future() - coordinator._handle_offset_fetch_response(future, response) if error is not None: - assert isinstance(future.exception, error) + with pytest.raises(error): + coordinator._handle_offset_fetch_response(response) else: - assert future.succeeded() - assert future.value == offsets + assert coordinator._handle_offset_fetch_response(response) == offsets assert coordinator.coordinator_id is (None if dead else 0) @@ -1265,8 +1294,13 @@ async def _hang(*args, **kwargs): def test_lookup_coordinator_failure(mocker, coordinator): - + # _send_group_coordinator_request is now an async coroutine scheduled + # via manager.call_soon, so we drive the event loop to let the mock + # fire before asserting on the returned future. + async def fake_send(): + raise Exception('foobar') mocker.patch.object(coordinator, '_send_group_coordinator_request', - return_value=Future().failure(Exception('foobar'))) + side_effect=fake_send) future = coordinator.lookup_coordinator() + coordinator._client.poll(future=future, timeout_ms=1000) assert future.failed()