Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ public void listen(String data, Acknowledgment ack) {
}
----

Starting with version 4.0.1, you can override the container factory's default `AckMode` directly on the `@KafkaListener` annotation using the `ackMode` attribute:

[source, java]
----
@KafkaListener(id = "manual", topics = "myTopic", ackMode = "MANUAL")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
----

The `ackMode` attribute accepts string values corresponding to `ContainerProperties.AckMode` enum values: `RECORD`, `BATCH`, `TIME`, `COUNT`, `COUNT_TIME`, `MANUAL`, or `MANUAL_IMMEDIATE`.
This eliminates the need to create separate container factories solely for different acknowledgment modes.

[[consumer-record-metadata]]
== Consumer Record Metadata

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,14 @@
* @since 3.1
*/
String containerPostProcessor() default "";

/**
* Override the container factory's default {@code ackMode} for this listener.
* <p>
* Supports SpEL {@code #{...}} and property placeholders {@code ${...}}.
* @return the ack mode (case-insensitive), or empty string to use factory default.
* @since 4.0.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
String ackMode() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
resolveErrorHandler(endpoint, kafkaListener);
resolveContentTypeConverter(endpoint, kafkaListener);
resolveFilter(endpoint, kafkaListener);
resolveAckMode(endpoint, kafkaListener);
resolveContainerPostProcessor(endpoint, kafkaListener);
}

Expand Down Expand Up @@ -740,6 +741,16 @@ private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaList
}
}

private void resolveAckMode(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
String ackMode = kafkaListener.ackMode();
if (StringUtils.hasText(ackMode)) {
String ackModeValue = resolveExpressionAsString(ackMode, "ackMode");
if (StringUtils.hasText(ackModeValue)) {
endpoint.setAckMode(ackModeValue);
}
}
}

private @Nullable KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener kafkaListener,
@Nullable Object factoryTarget, String beanName) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,9 @@ else if (this.autoStartup != null) {
.acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
.acceptIfNotNull(endpoint.getConsumerProperties(),
instance.getContainerProperties()::setKafkaConsumerProperties)
.acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo);
.acceptIfNotNull(endpoint.getListenerInfo(), instance::setListenerInfo)
.acceptIfNotNull(endpoint.getAckMode(), ackMode ->
properties.setAckMode(ContainerProperties.AckMode.valueOf(ackMode.toUpperCase())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
@Nullable
private String mainListenerId;

private @Nullable String ackMode;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -387,6 +389,25 @@ public void setAutoStartup(@Nullable Boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* Return the ackMode for this endpoint's container.
* @return the ackMode.
* @since 4.1
*/
public @Nullable String getAckMode() {
return this.ackMode;
}

/**
* Set the ackMode for this endpoint's container to override the factory's default.
* @param ackMode the ackMode string (case-insensitive).
* @since 4.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
public void setAckMode(@Nullable String ackMode) {
this.ackMode = ackMode;
}

/**
* Set a configurer which will be invoked when creating a reply message.
* @param replyHeadersConfigurer the configurer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,15 @@ default Boolean getBatchListener() {
return null;
}

/**
* Return the ackMode for this endpoint, or null if not explicitly set.
* @return the ack mode string.
* @since 4.1
* @see org.springframework.kafka.listener.ContainerProperties.AckMode
*/
@Nullable
default String getAckMode() {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2025-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* Tests for {@link KafkaListener} ackMode attribute.
*
* @author Go BeomJun
* @since 4.0.1
*/
@SpringJUnitConfig
public class KafkaListenerAckModeTests {

@Autowired
private KafkaListenerEndpointRegistry registry;

@Test
void testAckModeRecordOverride() {
MessageListenerContainer container = this.registry.getListenerContainer("ackModeRecordListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.RECORD);
}

@Test
void testAckModeManualOverride() {
MessageListenerContainer container = this.registry.getListenerContainer("ackModeManualListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.MANUAL);
}

@Test
void testAckModeDefault() {
MessageListenerContainer container = this.registry.getListenerContainer("ackModeDefaultListener");
assertThat(container).isNotNull();
assertThat(container.getContainerProperties().getAckMode())
.isEqualTo(ContainerProperties.AckMode.BATCH);
}

@Configuration
@EnableKafka
public static class Config {

@Bean
@SuppressWarnings("unchecked")
public ConsumerFactory<String, String> consumerFactory() {
return mock(ConsumerFactory.class);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// Set factory default to BATCH
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}

@KafkaListener(id = "ackModeRecordListener", topics = "ackModeRecord", ackMode = "RECORD", autoStartup = "false")
public void listenWithRecordAck(String message) {
}

@KafkaListener(id = "ackModeManualListener", topics = "ackModeManual", ackMode = "MANUAL", autoStartup = "false")
public void listenWithManualAck(String message, Acknowledgment ack) {
}

@KafkaListener(id = "ackModeDefaultListener", topics = "ackModeDefault", autoStartup = "false")
public void listenWithDefaultAck(String message) {
}

}

}