diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc index 8cbbd01570..be78a2e31c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc @@ -189,6 +189,20 @@ public void listen(String data, Acknowledgment ack) { } ---- +Starting with version 4.0.1, you can override the container factory's default `AckMode` directly on the `@KafkaListener` annotation using the `ackMode` attribute: + +[source, java] +---- +@KafkaListener(id = "manual", topics = "myTopic", ackMode = "MANUAL") +public void listen(String data, Acknowledgment ack) { + ... + ack.acknowledge(); +} +---- + +The `ackMode` attribute accepts string values corresponding to `ContainerProperties.AckMode` enum values: `RECORD`, `BATCH`, `TIME`, `COUNT`, `COUNT_TIME`, `MANUAL`, or `MANUAL_IMMEDIATE`. +This eliminates the need to create separate container factories solely for different acknowledgment modes. + [[consumer-record-metadata]] == Consumer Record Metadata diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index ce3621f42d..95a5bb202e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -338,4 +338,14 @@ * @since 3.1 */ String containerPostProcessor() default ""; + + /** + * Override the container factory's default {@code ackMode} for this listener. + *

+ * Supports SpEL {@code #{...}} and property placeholders {@code ${...}}. + * @return the ack mode (case-insensitive), or empty string to use factory default. + * @since 4.0.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + String ackMode() default ""; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f0c5f3faba..c757bc0ecc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -693,6 +693,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint en resolveErrorHandler(endpoint, kafkaListener); resolveContentTypeConverter(endpoint, kafkaListener); resolveFilter(endpoint, kafkaListener); + resolveAckMode(endpoint, kafkaListener); resolveContainerPostProcessor(endpoint, kafkaListener); } @@ -740,6 +741,16 @@ private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaList } } + private void resolveAckMode(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { + String ackMode = kafkaListener.ackMode(); + if (StringUtils.hasText(ackMode)) { + String ackModeValue = resolveExpressionAsString(ackMode, "ackMode"); + if (StringUtils.hasText(ackModeValue)) { + endpoint.setAckMode(ackModeValue); + } + } + } + private @Nullable KafkaListenerContainerFactory resolveContainerFactory(KafkaListener kafkaListener, @Nullable Object factoryTarget, String beanName) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index d26ef01dc6..4cd73e5af8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -456,7 +456,9 @@ else if (this.autoStartup != null) { .acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId) .acceptIfNotNull(endpoint.getConsumerProperties(), instance.getContainerProperties()::setKafkaConsumerProperties) - .acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo); + .acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo) + .acceptIfNotNull(endpoint.getAckMode(), ackMode -> + properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase()))); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 76b201300a..d2325c9c0b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -125,6 +125,8 @@ public abstract class AbstractKafkaListenerEndpoint @Nullable private String mainListenerId; + private @Nullable String ackMode; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -387,6 +389,25 @@ public void setAutoStartup(@Nullable Boolean autoStartup) { this.autoStartup = autoStartup; } + /** + * Return the ackMode for this endpoint's container. + * @return the ackMode. + * @since 4.1 + */ + public @Nullable String getAckMode() { + return this.ackMode; + } + + /** + * Set the ackMode for this endpoint's container to override the factory's default. + * @param ackMode the ackMode string (case-insensitive). + * @since 4.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + public void setAckMode(@Nullable String ackMode) { + this.ackMode = ackMode; + } + /** * Set a configurer which will be invoked when creating a reply message. * @param replyHeadersConfigurer the configurer. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 16f4123430..6aab0c641c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -187,4 +187,15 @@ default Boolean getBatchListener() { return null; } + /** + * Return the ackMode for this endpoint, or null if not explicitly set. + * @return the ack mode string. + * @since 4.1 + * @see org.springframework.kafka.listener.ContainerProperties.AckMode + */ + @Nullable + default String getAckMode() { + return null; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java new file mode 100644 index 0000000000..26e1ece352 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerAckModeTests.java @@ -0,0 +1,106 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link KafkaListener} ackMode attribute. + * + * @author Go BeomJun + * @since 4.0.1 + */ +@SpringJUnitConfig +public class KafkaListenerAckModeTests { + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @Test + void testAckModeRecordOverride() { + MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.RECORD); + } + + @Test + void testAckModeManualOverride() { + MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.MANUAL); + } + + @Test + void testAckModeDefault() { + MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener"); + assertThat(container).isNotNull(); + assertThat(container.getContainerProperties().getAckMode()) + .isEqualTo(ContainerProperties.AckMode.BATCH); + } + + @Configuration + @EnableKafka + public static class Config { + + @Bean + @SuppressWarnings("unchecked") + public ConsumerFactory consumerFactory() { + return mock(ConsumerFactory.class); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + // Set factory default to BATCH + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); + return factory; + } + + @KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD", autoStartup = "false") + public void listenWithRecordAck(String message) { + } + + @KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL", autoStartup = "false") + public void listenWithManualAck(String message, Acknowledgment ack) { + } + + @KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault", autoStartup = "false") + public void listenWithDefaultAck(String message) { + } + + } + +}