Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[[native-images]]
= Native Images

{spring-framework-reference-url}/core/aot.html[Spring AOT] native hints are provided to assist in developing native images for Spring applications that use Spring for Apache Kafka, including hints for AVRO generated classes used in `@KafkaListener`+++s+++.
{spring-framework-reference-url}/core/aot.html[Spring AOT] native hints are provided to assist in developing native images for Spring applications that use Spring for Apache Kafka, including hints for AVRO generated classes used in ``@KafkaListener``s.

IMPORTANT: `spring-kafka-test` (and, specifically, its `EmbeddedKafkaBroker`) is not supported in native images.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
----

Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode`+++s+++.
Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual ``AckMode``s.

[source, java]
----
Expand Down Expand Up @@ -250,7 +250,7 @@ Always ensure that exceptions thrown in message processing code explicitly exten
In other words, if the application throws an exception, ensure that it is extended from `RuntimeException` and not inadvertently inherited from `Error`.
Standard errors like `OutOfMemoryError`, `IllegalAccessError`, and other errors beyond the control of the application are still treated as ``Error``s and not retried.

The error handler can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress.
The error handler can be configured with one or more ``RetryListener``s, receiving notifications of retry and recovery progress.
Starting with version 2.8.10, methods for batch listeners were added.

[source, java]
Expand Down Expand Up @@ -487,7 +487,7 @@ public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment a

Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`.
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`.
A list of `ConversionException`+++s+++ is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred.
A list of ``ConversionException``s is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred.

Example:

Expand Down Expand Up @@ -756,7 +756,7 @@ Since the event also has a reference to the container, you can restart the conta

Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay.

Starting with version 2.7, the processor can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress.
Starting with version 2.7, the processor can be configured with one or more ``RetryListener``s, receiving notifications of retry and recovery progress.

[source, java]
----
Expand Down Expand Up @@ -838,7 +838,7 @@ public void listen(@Payload Thing thing,
}
----

When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the `KafkaListenerAnnotationBeanPostProcessor`+++'+++s `charSet` property.
When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the ``KafkaListenerAnnotationBeanPostProcessor``'s `charSet` property.

The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.

Expand Down Expand Up @@ -911,7 +911,7 @@ The record sent to the dead-letter topic is enhanced with the following headers:
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type.
* `KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP`: The original consumer group that failed to process the record (since version 2.8).

Key exceptions are only caused by `DeserializationException`+++s+++ so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`.
Key exceptions are only caused by ``DeserializationException``s so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`.

There are two mechanisms to add more headers.

Expand All @@ -923,8 +923,8 @@ The second is simpler to implement but the first has more information available,

Starting with version 2.3, when used in conjunction with an `ErrorHandlingDeserializer`, the publisher will restore the record `value()`, in the dead-letter producer record, to the original value that failed to be deserialized.
Previously, the `value()` was null and user code had to decode the `DeserializationException` from the message headers.
In addition, you can provide multiple `KafkaTemplate`+++s+++ to the publisher; this might be needed, for example, if you want to publish the `byte[]` from a `DeserializationException`, as well as values using a different serializer from records that were deserialized successfully.
Here is an example of configuring the publisher with `KafkaTemplate`+++s+++ that use a `String` and `byte[]` serializer:
In addition, you can provide multiple ``KafkaTemplate``s to the publisher; this might be needed, for example, if you want to publish the `byte[]` from a `DeserializationException`, as well as values using a different serializer from records that were deserialized successfully.
Here is an example of configuring the publisher with ``KafkaTemplate``s that use a `String` and `byte[]` serializer:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ include::{kotlin-examples}/topics/Config.kt[tag=brokerProps]
----
======

Starting with version 2.7, you can declare multiple `NewTopic`+++s+++ in a single `KafkaAdmin.NewTopics` bean definition:
Starting with version 2.7, you can declare multiple ``NewTopic``s in a single `KafkaAdmin.NewTopics` bean definition:

[tabs]
======
Expand All @@ -63,7 +63,7 @@ include::{kotlin-examples}/topics/Config.kt[tag=newTopicsBean]
======


IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean`+++s+++.
IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) ``@Bean``s.

By default, if the broker is not available, a message is logged, but the context continues to load.
You can programmatically invoke the admin's `initialize()` method to try again later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To close existing Consumers, call `stop()` (and then `start()`) on the `KafkaLis
For convenience, the framework also provides an `ABSwitchCluster` which supports two sets of bootstrap servers; one of which is active at any time.
Configure the `ABSwitchCluster` and add it to the producer and consumer factories, and the `KafkaAdmin`, by calling `setBootstrapServersSupplier()`.
When you want to switch, call `primary()` or `secondary()` and call `reset()` on the producer factory to establish new connection(s); for consumers, `stop()` and `start()` all listener containers.
When using `@KafkaListener`+++s+++, `stop()` and `start()` the `KafkaListenerEndpointRegistry` bean.
When using ``@KafkaListener``s, `stop()` and `start()` the `KafkaListenerEndpointRegistry` bean.

See the Javadocs for more information.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ The time to process a batch of records plus this value must be less than the `ma

|[[idleEventInterval]]<<idleEventInterval,`idleEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdleEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
|When set, enables publication of ``ListenerContainerIdleEvent``s, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
Also see `idleBeforeDataMultiplier`.

|[[idlePartitionEventInterval]]<<idlePartitionEventInterval,`idlePartitionEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdlePartitionEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
|When set, enables publication of ``ListenerContainerIdlePartitionEvent``s, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].

|[[kafkaConsumerProperties]]<<kafkaConsumerProperties,`kafkaConsumerProperties`>>
|None
Expand Down Expand Up @@ -287,7 +287,7 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han

|[[listenerId]]<<listenerId,`listenerId`>>
|See desc.
|The bean name for user-configured containers or the `id` attribute of `@KafkaListener`+++s+++.
|The bean name for user-configured containers or the `id` attribute of ``@KafkaListener``s.

|[[listenerInfo]]<<listenerInfo,`listenerInfo`>>
|null
Expand Down Expand Up @@ -342,18 +342,18 @@ Also see `interceptBeforeTx`.

|[[assignedPartitions2]]<<assignedPartitions2,`assignedPartitions`>>
|(read only)
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not).
|The aggregate of partitions currently assigned to this container's child ``KafkaMessageListenerContainer``s (explicitly or not).

|[[concurrency]]<<concurrency,`concurrency`>>
|1
|The number of child `KafkaMessageListenerContainer`+++s+++ to manage.
|The number of child ``KafkaMessageListenerContainer``s to manage.

|[[containerPaused2]]<<containerPaused2,`containerPaused`>>
|n/a
|True if pause has been requested and all child containers' consumer has actually paused.

|[[containers]]<<containers,`containers`>>
|n/a
|A reference to all child `KafkaMessageListenerContainer`+++s+++.
|A reference to all child ``KafkaMessageListenerContainer``s.
|===

Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ You can also use `@EventListener`, introduced in Spring Framework 4.2.

The next example combines `@KafkaListener` and `@EventListener` into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the `@EventListener`+++'+++s `condition` for this purpose.
You can also use the ``@EventListener``'s `condition` for this purpose.

See xref:kafka/events.adoc[Application Events] for information about event properties.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
[[monitoring-listener-performance]]
== Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty`+++'+++s `micrometerEnabled` to `false`.
Starting with version 2.3, the listener container will automatically create and update Micrometer ``Timer``s for the listener, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the ``ContainerProperty``'s `micrometerEnabled` to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.

Expand All @@ -15,16 +15,16 @@ The timers are named `spring.kafka.listener` and have the following tags:
* `result` : `success` or `failure`
* `exception` : `none` or `ListenerExecutionFailedException`

You can add additional tags using the `ContainerProperties`+++'+++s `micrometerTags` property.
You can add additional tags using the ``ContainerProperties``'s `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties`+++'+++s `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.
Starting with versions 2.9.8, 3.0.6, you can provide a function in ``ContainerProperties``'s `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.

[[monitoring-kafkatemplate-performance]]
== Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s+++ for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
Starting with version 2.5, the template will automatically create and update Micrometer ``Timer``s for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template's `micrometerEnabled` property to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Int

By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
Starting with version 2.9, this will apply to any transaction manager, not just ``KafkaAwareTransactionManager``s.
This allows, for example, the interceptor to participate in a JDBC transaction started by the container.

Starting with versions 2.3.8, 2.4.6, the `ConcurrentMessageListenerContainer` now supports {kafka-url}/documentation/#static_membership[Static Membership] when the concurrency is greater than one.
Expand Down Expand Up @@ -183,7 +183,7 @@ org.apache.kafka.clients.consumer.RoundRobinAssignor
=====
====

When the container properties are configured with `TopicPartitionOffset`+++s+++, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.
When the container properties are configured with ``TopicPartitionOffset``s, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.

If, say, six `TopicPartitionOffset` instances are provided and the `concurrency` is `3`; each container gets two partitions.
For five `TopicPartitionOffset` instances, two containers get two partitions, and the third gets one.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[[sequencing]]
= Starting `@KafkaListener`+++s+++ in Sequence
= Starting ``@KafkaListener``s in Sequence

A common use case is to start a listener after another listener has consumed all the records in a topic.
For example, you may want to load the contents of one or more compacted topics into memory before processing records from other topics.
Starting with version 2.7.3, a new component `ContainerGroupSequencer` has been introduced.
It uses the `@KafkaListener`+++'+++s `containerGroup` property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.
It uses the ``@KafkaListener``'s `containerGroup` property to group containers together and start the containers in the next group, when all the containers in the current group have gone idle.

It is best illustrated with an example.

Expand Down Expand Up @@ -37,7 +37,7 @@ Here, we have 4 listeners in two groups, `g1` and `g2`.
During application context initialization, the sequencer sets the `autoStartup` property of all the containers in the provided groups to `false`.
It also sets the `idleEventInterval` for any containers (that do not already have one set) to the supplied value (5000ms in this case).
Then, when the sequencer is started by the application context, the containers in the first group are started.
As `ListenerContainerIdleEvent`+++s+++ are received, each individual child container in each container is stopped.
As ``ListenerContainerIdleEvent``s are received, each individual child container in each container is stopped.
When all child containers in a `ConcurrentMessageListenerContainer` are stopped, the parent container is stopped.
When all containers in a group have been stopped, the containers in the next group are started.
There is no limit to the number of groups or containers in a group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Durati
As you can see, you need to know the partition and offset of the record(s) you need to retrieve; a new `Consumer` is created (and closed) for each operation.

With the last two methods, each record is retrieved individually and the results assembled into a `ConsumerRecords` object.
When creating the `TopicPartitionOffset`+++s+++ for the request, only positive, absolute offsets are supported.
When creating the ``TopicPartitionOffset``s for the request, only positive, absolute offsets are supported.

Loading