From c7546266869c7d21c7896af4e42bfd9050e7119b Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Tue, 7 Apr 2026 19:16:31 -0500 Subject: [PATCH 1/3] KAFKA-20397 Avoid blocking on fetchBuffer when metadata error is pending --- .../internals/AsyncKafkaConsumer.java | 7 ++ .../internals/AsyncKafkaConsumerTest.java | 64 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 1022bf6aab3b5..97a7da32e780a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1995,6 +1995,13 @@ private Fetch pollForFetches(Timer timer) { } } + // If the background thread has already discovered a metadata error (e.g. TopicAuthorizationException) + // and completed the inflight poll event, skip blocking on the fetch buffer entirely. The error will + // be surfaced by checkInflightPoll on the next iteration of the poll loop. See KAFKA-20397. + if (inflightPoll != null && inflightPoll.error().isPresent()) { + return collectFetch(); + } + log.trace("Polling for fetches with timeout {}", pollTimeout); Timer pollTimer = time.timer(pollTimeout); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index a8736329e4dfa..0a8a11cf515e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -71,6 +71,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; @@ -1824,6 +1825,69 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() { verify(applicationEventHandler, times(1)).add(isA(AsyncPollEvent.class)); } + /** + * KAFKA-20397: verifies that a metadata error discovered by the background thread between + * {@code checkInflightPoll()} and the blocking wait in {@code pollForFetches()} is surfaced + * promptly, without wasting the full fetch-wait interval. + * + *

Scenario: {@code maximumTimeToWait()} returns 100 ms (shorter than the 500 ms poll + * timeout). The background thread completes the inflight {@link AsyncPollEvent} with a + * {@link TopicAuthorizationException} before {@code pollForFetches()} reaches the blocking + * {@code fetchBuffer.awaitWakeup()} call. + */ + @Test + public void testPollSurfacesMetadataErrorWithoutWastingFetchWaitInterval() { + FetchBuffer fetchBuffer = mock(FetchBuffer.class); + ConsumerInterceptors interceptors = mock(ConsumerInterceptors.class); + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = mock(ConsumerRebalanceListenerInvoker.class); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer(fetchBuffer, interceptors, rebalanceListenerInvoker, subscriptions); + + final String topicName = "topic1"; + final TopicPartition tp = new TopicPartition(topicName, 0); + + subscriptions.assignFromUser(singleton(tp)); + subscriptions.seek(tp, 0); + + final long fetchWaitMs = 100L; + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + + AtomicReference capturedEvent = new AtomicReference<>(); + doAnswer(invocation -> { + capturedEvent.set(invocation.getArgument(0)); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); + + AtomicBoolean errorInjected = new AtomicBoolean(false); + + doAnswer(invocation -> { + if (!errorInjected.get() && capturedEvent.get() != null) { + capturedEvent.get().onMetadataError( + new TopicAuthorizationException(singleton(topicName))); + errorInjected.set(true); + } + return fetchWaitMs; + }).when(applicationEventHandler).maximumTimeToWait(); + + doAnswer(invocation -> { + Timer pollTimer = invocation.getArgument(0, Timer.class); + ((MockTime) time).sleep(pollTimer.remainingMs()); + return null; + }).when(fetchBuffer).awaitWakeup(any(Timer.class)); + + final long pollTimeoutMs = 500; + long startMs = time.milliseconds(); + + assertThrows(TopicAuthorizationException.class, + () -> consumer.poll(Duration.ofMillis(pollTimeoutMs))); + long elapsedMs = time.milliseconds() - startMs; + + assertTrue(elapsedMs < fetchWaitMs, + "Expected error to be surfaced promptly (elapsed " + elapsedMs + + " ms), but consumer wasted a full fetch-wait interval (" + fetchWaitMs + " ms)"); + } + /** * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents} * handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout. From 98ee0a674721425381463f88cb08b078d2f3075c Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 23:05:30 -0500 Subject: [PATCH 2/3] Fixing reviewer comment --- .../clients/consumer/internals/AsyncKafkaConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 0a8a11cf515e5..8688f4e40ba6e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1805,7 +1805,7 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() { doReturn(100L).when(applicationEventHandler).maximumTimeToWait(); doAnswer(invocation -> { Timer pollTimer = invocation.getArgument(0, Timer.class); - ((MockTime) time).sleep(pollTimer.remainingMs()); + time.sleep(pollTimer.remainingMs()); return null; }).when(fetchBuffer).awaitWakeup(any(Timer.class)); @@ -1872,7 +1872,7 @@ public void testPollSurfacesMetadataErrorWithoutWastingFetchWaitInterval() { doAnswer(invocation -> { Timer pollTimer = invocation.getArgument(0, Timer.class); - ((MockTime) time).sleep(pollTimer.remainingMs()); + time.sleep(pollTimer.remainingMs()); return null; }).when(fetchBuffer).awaitWakeup(any(Timer.class)); From 65250906f75ab6223963c16f8578bf10e6390fb5 Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 23:31:10 -0500 Subject: [PATCH 3/3] Fixing reviewer comment --- .../consumer/internals/AsyncKafkaConsumerTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8688f4e40ba6e..cc119cafb3e1f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1859,13 +1859,14 @@ public void testPollSurfacesMetadataErrorWithoutWastingFetchWaitInterval() { return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); - AtomicBoolean errorInjected = new AtomicBoolean(false); - + // Inject the error inside maximumTimeToWait(), which is the first call in + // pollForFetches — after checkInflightPoll has already submitted the event. + // This models the background thread completing the event with a metadata error + // between checkInflightPoll and the blocking wait. doAnswer(invocation -> { - if (!errorInjected.get() && capturedEvent.get() != null) { + if (capturedEvent.get() != null) { capturedEvent.get().onMetadataError( new TopicAuthorizationException(singleton(topicName))); - errorInjected.set(true); } return fetchWaitMs; }).when(applicationEventHandler).maximumTimeToWait();