diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
index ce3621f42d..2b7ae490c2 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java
@@ -95,7 +95,7 @@
*
Note: When provided, this value will override the group id property
* in the consumer factory configuration, unless {@link #idIsGroup()}
* is set to false or {@link #groupId()} is provided.
- *
SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
+ *
SpEL {@code #{...}} and property placeholders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
index f611178907..c3a2ad1738 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@@ -172,6 +172,7 @@
* @author Christian Fredriksson
* @author Timofey Barabanov
* @author Janek Lasocki-Biczysko
+ * @author Mikhail Polivakha
*/
public class KafkaMessageListenerContainer // NOSONAR line count
extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher {
@@ -340,7 +341,7 @@ public void resumePartition(TopicPartition topicPartition) {
}
private void consumerWakeIfNecessary() {
- KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer;
+ ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
consumer.wakeIfNecessary();
}
@@ -913,14 +914,23 @@ else if (listener instanceof MessageListener) {
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();
- if (!AopUtils.isAopProxy(this.genericListener) &&
- this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter, ?>) {
+ if (!AopUtils.isAopProxy(this.genericListener)) {
- KafkaBackoffAwareMessageListenerAdapter genListener =
- (KafkaBackoffAwareMessageListenerAdapter) this.genericListener;
- if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter adapterListener) {
- // This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic.
- adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
+ if (this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter, ?>) {
+ KafkaBackoffAwareMessageListenerAdapter genListener =
+ (KafkaBackoffAwareMessageListenerAdapter) this.genericListener;
+
+ if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter adapterListener) {
+ // This means that the async retry feature is supported only for SingleRecordListener when used with @RetryableTopic.
+ adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
+ }
+ }
+
+ if (this.genericListener instanceof RecordMessagingMessageListenerAdapter, ?>) {
+ RecordMessagingMessageListenerAdapter recordListnener =
+ (RecordMessagingMessageListenerAdapter) this.genericListener;
+
+ recordListnener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}
}
diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt
index 3033da54e2..86f4782619 100644
--- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt
+++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerConfig
@@ -35,7 +36,10 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
+import org.springframework.kafka.listener.CommonErrorHandler
+import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.listener.KafkaListenerErrorHandler
+import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
@@ -53,12 +57,13 @@ import java.util.concurrent.TimeUnit
*
* @author Wang ZhiYang
* @author Artem Bilan
+ * @author Mikhail Polivakha
*
* @since 3.1
*/
@SpringJUnitConfig
@DirtiesContext
-@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
+@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", "kotlinAsyncTestTopicContainerLevelHandler",
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1)
class EnableKafkaKotlinCoroutinesTests {
@@ -87,6 +92,12 @@ class EnableKafkaKotlinCoroutinesTests {
assertThat(this.config.error).isTrue()
}
+ @Test
+ fun `test checked ex - container level handler should be invoked`() {
+ this.template.send("kotlinAsyncTestTopicContainerLevelHandler", "fail")
+ assertThat(this.config.latchForCommonHandler.await(10, TimeUnit.SECONDS)).isTrue()
+ }
+
@Test
fun `test batch listener`() {
this.template.send("kotlinAsyncBatchTestTopic1", "foo")
@@ -142,6 +153,8 @@ class EnableKafkaKotlinCoroutinesTests {
val latch2 = CountDownLatch(1)
+ val latchForCommonHandler = CountDownLatch(1)
+
val batchLatch1 = CountDownLatch(1)
val batchLatch2 = CountDownLatch(1)
@@ -190,6 +203,21 @@ class EnableKafkaKotlinCoroutinesTests {
}
}
+ @Bean
+ fun defaultErrorHandler(): CommonErrorHandler {
+ return object : CommonErrorHandler {
+ override fun handleOne(
+ thrownException: java.lang.Exception,
+ record: ConsumerRecord<*, *>,
+ consumer: Consumer<*, *>,
+ container: MessageListenerContainer
+ ): Boolean {
+ latchForCommonHandler.countDown()
+ return super.handleOne(thrownException, record, consumer, container)
+ }
+ }
+ }
+
@Bean
fun errorHandlerBatch() : KafkaListenerErrorHandler {
return KafkaListenerErrorHandler { message, _ ->
@@ -200,11 +228,12 @@ class EnableKafkaKotlinCoroutinesTests {
}
@Bean
- fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory {
+ fun kafkaListenerContainerFactory(commonErrorHandler: CommonErrorHandler): ConcurrentKafkaListenerContainerFactory {
val factory: ConcurrentKafkaListenerContainerFactory
= ConcurrentKafkaListenerContainerFactory()
factory.setConsumerFactory(kcf())
factory.setReplyTemplate(kt())
+ factory.setCommonErrorHandler(commonErrorHandler)
return factory
}
@@ -247,6 +276,16 @@ class EnableKafkaKotlinCoroutinesTests {
}
}
+ @KafkaListener(
+ id = "kotlin-ex-container-level-handler",
+ topics = ["kotlinAsyncTestTopicContainerLevelHandler"],
+ containerFactory = "kafkaListenerContainerFactory")
+ suspend fun listenExContainerLevelHandlerInvoked(value: String) {
+ if (value == "fail") {
+ throw RuntimeException("checked")
+ }
+ }
+
}
}