diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index ec27794e6d938..38074ebb7aecb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1990,6 +1990,13 @@ private Fetch pollForFetches(Timer timer) { } } + // If the background thread has already discovered a metadata error (e.g. TopicAuthorizationException) + // and completed the inflight poll event, skip blocking on the fetch buffer entirely. The error will + // be surfaced by checkInflightPoll on the next iteration of the poll loop. See KAFKA-20397. + if (inflightPoll != null && inflightPoll.error().isPresent()) { + return collectFetch(); + } + log.trace("Polling for fetches with timeout {}", pollTimeout); Timer pollTimer = time.timer(pollTimeout); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 9743da654b258..0623550d37972 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -71,6 +71,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; @@ -1736,6 +1737,73 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() { verify(applicationEventHandler, times(1)).add(isA(AsyncPollEvent.class)); } + /** + * KAFKA-20397: verifies that a metadata error discovered by the background thread between + * {@code checkInflightPoll()} and the blocking wait in {@code pollForFetches()} is surfaced + * promptly, without wasting the full fetch-wait interval. + * + *

Scenario: {@code maximumTimeToWait()} returns 100 ms (shorter than the 500 ms poll + * timeout). The background thread completes the inflight {@link AsyncPollEvent} with a + * {@link TopicAuthorizationException} before {@code pollForFetches()} reaches the blocking + * {@code fetchBuffer.awaitWakeup()} call. + */ + @Test + public void testPollSurfacesMetadataErrorWithoutWastingFetchWaitInterval() { + FetchBuffer fetchBuffer = mock(FetchBuffer.class); + ConsumerInterceptors interceptors = mock(ConsumerInterceptors.class); + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = mock(ConsumerRebalanceListenerInvoker.class); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer(fetchBuffer, interceptors, rebalanceListenerInvoker, subscriptions); + + final String topicName = "topic1"; + final TopicPartition tp = new TopicPartition(topicName, 0); + + subscriptions.assignFromUser(singleton(tp)); + subscriptions.seek(tp, 0); + + final long fetchWaitMs = 100L; + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + + AtomicReference capturedEvent = new AtomicReference<>(); + doAnswer(invocation -> { + capturedEvent.set(invocation.getArgument(0)); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); + + AtomicBoolean errorInjected = new AtomicBoolean(false); + + // Inject the error inside maximumTimeToWait(), which is the first call in + // pollForFetches — after checkInflightPoll has already submitted the event. + // This models the background thread completing the event with a metadata error + // between checkInflightPoll and the blocking wait. + doAnswer(invocation -> { + if (!errorInjected.get() && capturedEvent.get() != null) { + capturedEvent.get().onMetadataError( + new TopicAuthorizationException(singleton(topicName))); + errorInjected.set(true); + } + return fetchWaitMs; + }).when(applicationEventHandler).maximumTimeToWait(); + + doAnswer(invocation -> { + Timer pollTimer = invocation.getArgument(0, Timer.class); + ((MockTime) time).sleep(pollTimer.remainingMs()); + return null; + }).when(fetchBuffer).awaitWakeup(any(Timer.class)); + + final long pollTimeoutMs = 500; + long startMs = time.milliseconds(); + + assertThrows(TopicAuthorizationException.class, + () -> consumer.poll(Duration.ofMillis(pollTimeoutMs))); + long elapsedMs = time.milliseconds() - startMs; + + assertTrue(elapsedMs < fetchWaitMs, + "Expected error to be surfaced promptly (elapsed " + elapsedMs + + " ms), but consumer wasted a full fetch-wait interval (" + fetchWaitMs + " ms)"); + } + /** * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents} * handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.