From 61eb968c550694cd5bf4c6257c718ff8a614d7af Mon Sep 17 00:00:00 2001 From: gobeomjun Date: Sat, 22 Nov 2025 14:38:20 +0900 Subject: [PATCH 1/5] Add per-listener ackMode attribute to @KafkaListener Allows individual Kafka listeners to specify different acknowledgment modes without creating multiple container factory beans. This addresses the need to handle different reliability requirements across listeners in the same application (e.g., critical transactions with manual acknowledgment, notifications with batch acknowledgment, and analytics with record acknowledgment). Key changes: - Added ackMode() attribute to @KafkaListener annotation supporting all ContainerProperties.AckMode values (RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE) - Supports SpEL expressions and property placeholders - Endpoint-level ackMode overrides factory default when specified - Added resolveAckMode() to KafkaListenerAnnotationBeanPostProcessor to process the annotation attribute - Updated endpoint infrastructure (KafkaListenerEndpoint, AbstractKafkaListenerEndpoint, AbstractKafkaListenerContainerFactory) to store and apply ackMode - Added comprehensive tests in KafkaListenerAckModeTests Example usage: @KafkaListener(topics = "critical", ackMode = "MANUAL") @KafkaListener(topics = "notifications", ackMode = "BATCH") Fixes GH-4174 Signed-off-by: gobeomjun --- .../kafka/annotation/KafkaListener.java | 10 ++ ...kaListenerAnnotationBeanPostProcessor.java | 11 ++ ...AbstractKafkaListenerContainerFactory.java | 6 + .../config/AbstractKafkaListenerEndpoint.java | 21 +++ .../kafka/config/KafkaListenerEndpoint.java | 11 ++ .../annotation/KafkaListenerAckModeTests.java | 163 ++++++++++++++++++ 6 files changed, 222 insertions(+) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index ce3621f42d..f6c02ce6f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -338,4 +338,14 @@ * @since 3.1 */ String containerPostProcessor() default ""; + + /** + * Override the container factory's default {@code ackMode} for this listener. + *

+ * Supports SpEL {@code #{...}} and property placeholders {@code ${...}}. + * @return the ack mode (case-insensitive), or empty string to use factory default. + * @since 4.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + String ackMode() default ""; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f0c5f3faba..c757bc0ecc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -693,6 +693,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint en resolveErrorHandler(endpoint, kafkaListener); resolveContentTypeConverter(endpoint, kafkaListener); resolveFilter(endpoint, kafkaListener); + resolveAckMode(endpoint, kafkaListener); resolveContainerPostProcessor(endpoint, kafkaListener); } @@ -740,6 +741,16 @@ private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaList } } + private void resolveAckMode(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { + String ackMode = kafkaListener.ackMode(); + if (StringUtils.hasText(ackMode)) { + String ackModeValue = resolveExpressionAsString(ackMode, "ackMode"); + if (StringUtils.hasText(ackModeValue)) { + endpoint.setAckMode(ackModeValue); + } + } + } + private @Nullable KafkaListenerContainerFactory resolveContainerFactory(KafkaListener kafkaListener, @Nullable Object factoryTarget, String beanName) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index d26ef01dc6..3e14ccdc8c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -457,6 +457,12 @@ else if (this.autoStartup != null) { .acceptIfNotNull(endpoint.getConsumerProperties(), instance.getContainerProperties()::setKafkaConsumerProperties) .acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo); + + // Set ackMode if specified in endpoint (overrides factory default) + String ackMode = endpoint.getAckMode(); + if (ackMode != null) { + properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase())); + } } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 76b201300a..d2325c9c0b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -125,6 +125,8 @@ public abstract class AbstractKafkaListenerEndpoint @Nullable private String mainListenerId; + private @Nullable String ackMode; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -387,6 +389,25 @@ public void setAutoStartup(@Nullable Boolean autoStartup) { this.autoStartup = autoStartup; } + /** + * Return the ackMode for this endpoint's container. + * @return the ackMode. + * @since 4.1 + */ + public @Nullable String getAckMode() { + return this.ackMode; + } + + /** + * Set the ackMode for this endpoint's container to override the factory's default. + * @param ackMode the ackMode string (case-insensitive). + * @since 4.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + public void setAckMode(@Nullable String ackMode) { + this.ackMode = ackMode; + } + /** * Set a configurer which will be invoked when creating a reply message. * @param replyHeadersConfigurer the configurer. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 16f4123430..6aab0c641c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -187,4 +187,15 @@ default Boolean getBatchListener() { return null; } + /** + * Return the ackMode for this endpoint, or null if not explicitly set. + * @return the ack mode string. + * @since 4.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + @Nullable + default String getAckMode() { + return null; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java new file mode 100644 index 0000000000..5d12f40c9b --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java @@ -0,0 +1,163 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link KafkaListener} ackMode attribute. + * + * @author GO BEOMJUN + * @since 4.1 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = {"ackModeRecord", "ackModeManual", "ackModeDefault"}, partitions = 1) +public class KafkaListenerAckModeTests { + + @Autowired + private KafkaTemplate template; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @Test + void testAckModeRecordOverride() throws Exception { + this.template.send("ackModeRecord", "test-record"); + assertThat(this.config.recordLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // Verify that the listener container has the correct ack mode + MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.RECORD); + } + + @Test + void testAckModeManualOverride() throws Exception { + this.template.send("ackModeManual", "test-manual"); + assertThat(this.config.manualLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // Verify that the listener container has the correct ack mode + MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.MANUAL); + } + + @Test + void testAckModeDefault() throws Exception { + this.template.send("ackModeDefault", "test-default"); + assertThat(this.config.defaultLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // Verify that the listener container uses factory default (BATCH) + MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.BATCH); + } + + @Configuration + @EnableKafka + public static class Config { + + final CountDownLatch recordLatch = new CountDownLatch(1); + + final CountDownLatch manualLatch = new CountDownLatch(1); + + final CountDownLatch defaultLatch = new CountDownLatch(1); + + @Bean + public ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + Map consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(broker, "testGroup", false)); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(consumerProps); + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(broker)); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + // Set factory default to BATCH + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); + return factory; + } + + @KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD") + public void listenWithRecordAck(String message) { + this.recordLatch.countDown(); + } + + @KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL") + public void listenWithManualAck(String message, Acknowledgment ack) { + ack.acknowledge(); + this.manualLatch.countDown(); + } + + @KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault") + public void listenWithDefaultAck(String message) { + this.defaultLatch.countDown(); + } + + } + +} From b791695a22f9e67c4f81126ea3370c38e4b4e2a3 Mon Sep 17 00:00:00 2001 From: gobeomjun Date: Tue, 25 Nov 2025 01:39:30 +0900 Subject: [PATCH 2/5] Add ackMode attribute to @KafkaListener annotation This commit introduces a new ackMode attribute to the @KafkaListener annotation, enabling per-listener acknowledgment mode configuration. This allows developers to override the container factory's default ackMode on a per-listener basis. The ackMode attribute accepts string values corresponding to ContainerProperties.AckMode enum values (e.g., "RECORD", "BATCH", "MANUAL", "MANUAL_IMMEDIATE"). Signed-off-by: gobeomjun --- .../org/springframework/kafka/annotation/KafkaListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index f6c02ce6f5..95a5bb202e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -344,7 +344,7 @@ *

* Supports SpEL {@code #{...}} and property placeholders {@code ${...}}. * @return the ack mode (case-insensitive), or empty string to use factory default. - * @since 4.1 + * @since 4.0.1 * @see org.springframework.kafka.listener.ContainerProperties.AckMode */ String ackMode() default ""; From e1b0dcd54ee1e1b4b2bf85ee077d6a4efb11c776 Mon Sep 17 00:00:00 2001 From: gobeomjun Date: Tue, 25 Nov 2025 01:39:56 +0900 Subject: [PATCH 3/5] Refactor ackMode configuration to use JavaUtils pattern Replace explicit null check and manual ackMode setting with the JavaUtils.acceptIfNotNull() pattern for consistency with other property configurations in the initializeContainer method. This improves code readability and maintains a uniform style throughout the property initialization logic. Signed-off-by: gobeomjun --- .../config/AbstractKafkaListenerContainerFactory.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 3e14ccdc8c..4cd73e5af8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -456,13 +456,9 @@ else if (this.autoStartup != null) { .acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId) .acceptIfNotNull(endpoint.getConsumerProperties(), instance.getContainerProperties()::setKafkaConsumerProperties) - .acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo); - - // Set ackMode if specified in endpoint (overrides factory default) - String ackMode = endpoint.getAckMode(); - if (ackMode != null) { - properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase())); - } + .acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo) + .acceptIfNotNull(endpoint.getAckMode(), ackMode -> + properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase()))); } @Override From a81377fa2652a0b59f566906b155901dad929c87 Mon Sep 17 00:00:00 2001 From: gobeomjun Date: Tue, 25 Nov 2025 01:41:05 +0900 Subject: [PATCH 4/5] Add tests for @KafkaListener ackMode attribute Signed-off-by: gobeomjun --- .../annotation/KafkaListenerAckModeTests.java | 87 ++++--------------- 1 file changed, 15 insertions(+), 72 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java index 5d12f40c9b..26e1ece352 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java @@ -16,14 +16,6 @@ package org.springframework.kafka.annotation; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -32,47 +24,28 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.Acknowledgment; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; /** * Tests for {@link KafkaListener} ackMode attribute. * - * @author GO BEOMJUN - * @since 4.1 + * @author Go BeomJun + * @since 4.0.1 */ @SpringJUnitConfig -@DirtiesContext -@EmbeddedKafka(topics = {"ackModeRecord", "ackModeManual", "ackModeDefault"}, partitions = 1) public class KafkaListenerAckModeTests { - @Autowired - private KafkaTemplate template; - - @Autowired - private Config config; - @Autowired private KafkaListenerEndpointRegistry registry; @Test - void testAckModeRecordOverride() throws Exception { - this.template.send("ackModeRecord", "test-record"); - assertThat(this.config.recordLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - // Verify that the listener container has the correct ack mode + void testAckModeRecordOverride() { MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener"); assertThat(container).isNotNull(); assertThat(container.getContainerProperties().getAckMode()) @@ -80,11 +53,7 @@ void testAckModeRecordOverride() throws Exception { } @Test - void testAckModeManualOverride() throws Exception { - this.template.send("ackModeManual", "test-manual"); - assertThat(this.config.manualLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - // Verify that the listener container has the correct ack mode + void testAckModeManualOverride() { MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener"); assertThat(container).isNotNull(); assertThat(container.getContainerProperties().getAckMode()) @@ -92,11 +61,7 @@ void testAckModeManualOverride() throws Exception { } @Test - void testAckModeDefault() throws Exception { - this.template.send("ackModeDefault", "test-default"); - assertThat(this.config.defaultLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - // Verify that the listener container uses factory default (BATCH) + void testAckModeDefault() { MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener"); assertThat(container).isNotNull(); assertThat(container.getContainerProperties().getAckMode()) @@ -107,34 +72,16 @@ void testAckModeDefault() throws Exception { @EnableKafka public static class Config { - final CountDownLatch recordLatch = new CountDownLatch(1); - - final CountDownLatch manualLatch = new CountDownLatch(1); - - final CountDownLatch defaultLatch = new CountDownLatch(1); - - @Bean - public ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { - Map consumerProps = new HashMap<>(KafkaTestUtils.consumerProps(broker, "testGroup", false)); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return new DefaultKafkaConsumerFactory<>(consumerProps); - } - - @Bean - public ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { - return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(broker)); - } - @Bean - public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { - return new KafkaTemplate<>(producerFactory); + @SuppressWarnings("unchecked") + public ConsumerFactory consumerFactory() { + return mock(ConsumerFactory.class); } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConsumerFactory consumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); // Set factory default to BATCH @@ -142,20 +89,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCon return factory; } - @KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD") + @KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD", autoStartup = "false") public void listenWithRecordAck(String message) { - this.recordLatch.countDown(); } - @KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL") + @KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL", autoStartup = "false") public void listenWithManualAck(String message, Acknowledgment ack) { - ack.acknowledge(); - this.manualLatch.countDown(); } - @KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault") + @KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault", autoStartup = "false") public void listenWithDefaultAck(String message) { - this.defaultLatch.countDown(); } } From 00456e776406afd9d6cf30bda1b721ad74a012f2 Mon Sep 17 00:00:00 2001 From: gobeomjun Date: Tue, 25 Nov 2025 01:43:38 +0900 Subject: [PATCH 5/5] Document ackMode attribute in @KafkaListener Signed-off-by: gobeomjun --- .../receiving-messages/listener-annotation.adoc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc index 8cbbd01570..be78a2e31c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc @@ -189,6 +189,20 @@ public void listen(String data, Acknowledgment ack) { } ---- +Starting with version 4.0.1, you can override the container factory's default `AckMode` directly on the `@KafkaListener` annotation using the `ackMode` attribute: + +[source, java] +---- +@KafkaListener(id = "manual", topics = "myTopic", ackMode = "MANUAL") +public void listen(String data, Acknowledgment ack) { + ... + ack.acknowledge(); +} +---- + +The `ackMode` attribute accepts string values corresponding to `ContainerProperties.AckMode` enum values: `RECORD`, `BATCH`, `TIME`, `COUNT`, `COUNT_TIME`, `MANUAL`, or `MANUAL_IMMEDIATE`. +This eliminates the need to create separate container factories solely for different acknowledgment modes. + [[consumer-record-metadata]] == Consumer Record Metadata