From dc63a5de21d0be953fa25d105a0be78eb0b7e08f Mon Sep 17 00:00:00 2001 From: mipo256 Date: Sun, 23 Nov 2025 13:10:10 +0300 Subject: [PATCH] GH-3618: Consider CommonErrorHandler for suspended functions Signed-off-by: mipo256 --- .../kafka/annotation/KafkaListener.java | 2 +- .../KafkaMessageListenerContainer.java | 26 +++++++---- .../EnableKafkaKotlinCoroutinesTests.kt | 43 ++++++++++++++++++- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index ce3621f42d..2b7ae490c2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -95,7 +95,7 @@ *

Note: When provided, this value will override the group id property * in the consumer factory configuration, unless {@link #idIsGroup()} * is set to false or {@link #groupId()} is provided. - *

SpEL {@code #{...}} and property place holders {@code ${...}} are supported. + *

SpEL {@code #{...}} and property placeholders {@code ${...}} are supported. * @return the {@code id} for the container managing for this endpoint. * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String) */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f611178907..c3a2ad1738 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -172,6 +172,7 @@ * @author Christian Fredriksson * @author Timofey Barabanov * @author Janek Lasocki-Biczysko + * @author Mikhail Polivakha */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -340,7 +341,7 @@ public void resumePartition(TopicPartition topicPartition) { } private void consumerWakeIfNecessary() { - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; + ListenerConsumer consumer = this.listenerConsumer; if (consumer != null) { consumer.wakeIfNecessary(); } @@ -913,14 +914,23 @@ else if (listener instanceof MessageListener) { this.pollThreadStateProcessor = setUpPollProcessor(false); this.observationEnabled = this.containerProperties.isObservationEnabled(); - if (!AopUtils.isAopProxy(this.genericListener) && - this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter) { + if (!AopUtils.isAopProxy(this.genericListener)) { - KafkaBackoffAwareMessageListenerAdapter genListener = - (KafkaBackoffAwareMessageListenerAdapter) this.genericListener; - if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter adapterListener) { - // This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic. - adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure); + if (this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter) { + KafkaBackoffAwareMessageListenerAdapter genListener = + (KafkaBackoffAwareMessageListenerAdapter) this.genericListener; + + if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter adapterListener) { + // This means that the async retry feature is supported only for SingleRecordListener when used with @RetryableTopic. + adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure); + } + } + + if (this.genericListener instanceof RecordMessagingMessageListenerAdapter) { + RecordMessagingMessageListenerAdapter recordListnener = + (RecordMessagingMessageListenerAdapter) this.genericListener; + + recordListnener.setCallbackForAsyncFailure(this::callbackForAsyncFailure); } } } diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index 3033da54e2..86f4782619 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -14,6 +14,7 @@ * limitations under the License. */ +import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerConfig @@ -35,7 +36,10 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.listener.CommonErrorHandler +import org.springframework.kafka.listener.DefaultErrorHandler import org.springframework.kafka.listener.KafkaListenerErrorHandler +import org.springframework.kafka.listener.MessageListenerContainer import org.springframework.kafka.support.Acknowledgment import org.springframework.kafka.test.EmbeddedKafkaBroker import org.springframework.kafka.test.context.EmbeddedKafka @@ -53,12 +57,13 @@ import java.util.concurrent.TimeUnit * * @author Wang ZhiYang * @author Artem Bilan + * @author Mikhail Polivakha * * @since 3.1 */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", +@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", "kotlinAsyncTestTopicContainerLevelHandler", "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1) class EnableKafkaKotlinCoroutinesTests { @@ -87,6 +92,12 @@ class EnableKafkaKotlinCoroutinesTests { assertThat(this.config.error).isTrue() } + @Test + fun `test checked ex - container level handler should be invoked`() { + this.template.send("kotlinAsyncTestTopicContainerLevelHandler", "fail") + assertThat(this.config.latchForCommonHandler.await(10, TimeUnit.SECONDS)).isTrue() + } + @Test fun `test batch listener`() { this.template.send("kotlinAsyncBatchTestTopic1", "foo") @@ -142,6 +153,8 @@ class EnableKafkaKotlinCoroutinesTests { val latch2 = CountDownLatch(1) + val latchForCommonHandler = CountDownLatch(1) + val batchLatch1 = CountDownLatch(1) val batchLatch2 = CountDownLatch(1) @@ -190,6 +203,21 @@ class EnableKafkaKotlinCoroutinesTests { } } + @Bean + fun defaultErrorHandler(): CommonErrorHandler { + return object : CommonErrorHandler { + override fun handleOne( + thrownException: java.lang.Exception, + record: ConsumerRecord<*, *>, + consumer: Consumer<*, *>, + container: MessageListenerContainer + ): Boolean { + latchForCommonHandler.countDown() + return super.handleOne(thrownException, record, consumer, container) + } + } + } + @Bean fun errorHandlerBatch() : KafkaListenerErrorHandler { return KafkaListenerErrorHandler { message, _ -> @@ -200,11 +228,12 @@ class EnableKafkaKotlinCoroutinesTests { } @Bean - fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + fun kafkaListenerContainerFactory(commonErrorHandler: CommonErrorHandler): ConcurrentKafkaListenerContainerFactory { val factory: ConcurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory() factory.setConsumerFactory(kcf()) factory.setReplyTemplate(kt()) + factory.setCommonErrorHandler(commonErrorHandler) return factory } @@ -247,6 +276,16 @@ class EnableKafkaKotlinCoroutinesTests { } } + @KafkaListener( + id = "kotlin-ex-container-level-handler", + topics = ["kotlinAsyncTestTopicContainerLevelHandler"], + containerFactory = "kafkaListenerContainerFactory") + suspend fun listenExContainerLevelHandlerInvoked(value: String) { + if (value == "fail") { + throw RuntimeException("checked") + } + } + } }