Skip to content

Commit 9be325f

Browse files
committed
docs : add inline comment
Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent 2d92639 commit 9be325f

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
844844

845845
private @Nullable ConsumerRecords<K, V> remainingRecords;
846846

847+
// use to fix tx marker
847848
private @Nullable ConsumerRecords<K, V> lastRecords;
848849

849850
private boolean pauseForPending;
@@ -1613,6 +1614,15 @@ private void saveLastRecordsIfNeeded(ConsumerRecords<K, V> records) {
16131614
}
16141615
}
16151616

1617+
/**
1618+
* Fix transactional offsets using ConsumerRecords#nextOffsets() API.
1619+
* This method addresses the issue where Kafka transaction markers can cause
1620+
* incorrect offset tracking. By using nextOffsets() instead of position(),
1621+
* we ensure:
1622+
* - Transaction markers are automatically filtered out
1623+
* - Partition-leader epoch information is correctly captured
1624+
* - Empty poll() cases are properly handled
1625+
*/
16161626
private void fixTxOffsetsIfNeeded() {
16171627
if (!this.fixTxOffsets) {
16181628
return;
@@ -1627,6 +1637,7 @@ private void fixTxOffsetsIfNeeded() {
16271637
Map<TopicPartition, OffsetAndMetadata> nextOffsets = this.lastRecords.nextOffsets();
16281638
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
16291639

1640+
// Fix offset only if records were actually processed
16301641
nextOffsets.forEach((tp, nextOffset) -> {
16311642
OffsetAndMetadata committed = this.lastCommits.get(tp);
16321643

@@ -1635,6 +1646,7 @@ private void fixTxOffsetsIfNeeded() {
16351646
return;
16361647
}
16371648

1649+
// Only fix if we have processed records and the next offset is greater than committed
16381650
if (!this.lastRecords.records(tp).isEmpty() && nextOffset.offset() > committed.offset()) {
16391651
toFix.put(tp, nextOffset);
16401652
}

0 commit comments

Comments
 (0)