Skip to content

Commit 2d92639

Browse files
committed
refactor: use ConsumerRecords#nextOffsets in fixTxOffsets
Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent ec7aeb4 commit 2d92639

File tree

1 file changed

+49
-37
lines changed

1 file changed

+49
-37
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
* @author Christian Fredriksson
173173
* @author Timofey Barabanov
174174
* @author Janek Lasocki-Biczysko
175+
* @author Su Ko
175176
*/
176177
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
177178
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -843,6 +844,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
843844

844845
private @Nullable ConsumerRecords<K, V> remainingRecords;
845846

847+
private @Nullable ConsumerRecords<K, V> lastRecords;
848+
846849
private boolean pauseForPending;
847850

848851
private boolean firstPoll;
@@ -1532,7 +1535,7 @@ private void doProcessCommits() {
15321535
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
15331536
if (records != null && records.count() > 0) {
15341537
this.receivedSome = true;
1535-
savePositionsIfNeeded(records);
1538+
saveLastRecordsIfNeeded(records);
15361539
notIdle();
15371540
notIdlePartitions(records.partitions());
15381541
invokeListener(records);
@@ -1604,50 +1607,59 @@ private void notIdle() {
16041607
}
16051608
}
16061609

1607-
private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
1610+
private void saveLastRecordsIfNeeded(ConsumerRecords<K, V> records) {
16081611
if (this.fixTxOffsets) {
1609-
this.savedPositions.clear();
1610-
records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
1612+
this.lastRecords = records;
16111613
}
16121614
}
16131615

16141616
private void fixTxOffsetsIfNeeded() {
1615-
if (this.fixTxOffsets) {
1616-
try {
1617-
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
1618-
this.lastCommits.forEach((tp, oamd) -> {
1619-
long position = this.consumer.position(tp);
1620-
Long saved = this.savedPositions.get(tp);
1621-
if (saved != null && saved != position) {
1622-
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
1623-
+ "saved: " + this.savedPositions + ", "
1624-
+ "committed: " + oamd + ", "
1625-
+ "current: " + tp + "@" + position);
1626-
return;
1627-
}
1628-
if (position > oamd.offset()) {
1629-
toFix.put(tp, createOffsetAndMetadata(position));
1630-
}
1631-
});
1632-
if (!toFix.isEmpty()) {
1633-
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
1634-
if (this.kafkaTxManager == null) {
1635-
commitOffsets(toFix);
1636-
}
1637-
else {
1638-
Objects.requireNonNull(this.transactionTemplate).executeWithoutResult(
1639-
status -> doSendOffsets(getTxProducer(), toFix));
1640-
}
1641-
}
1642-
}
1643-
catch (Exception e) {
1644-
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1645-
+ ListenerConsumer.this.lastCommits);
1617+
if (!this.fixTxOffsets) {
1618+
return;
1619+
}
1620+
1621+
try {
1622+
if (this.lastRecords == null) {
1623+
this.logger.trace(() -> "No previous records available for TX offset fix.");
1624+
return;
16461625
}
1647-
finally {
1648-
ListenerConsumer.this.lastCommits.clear();
1626+
1627+
Map<TopicPartition, OffsetAndMetadata> nextOffsets = this.lastRecords.nextOffsets();
1628+
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
1629+
1630+
nextOffsets.forEach((tp, nextOffset) -> {
1631+
OffsetAndMetadata committed = this.lastCommits.get(tp);
1632+
1633+
if (committed == null) {
1634+
this.logger.debug(() -> "No committed offset for " + tp + "; skipping TX offset fix.");
1635+
return;
1636+
}
1637+
1638+
if (!this.lastRecords.records(tp).isEmpty() && nextOffset.offset() > committed.offset()) {
1639+
toFix.put(tp, nextOffset);
1640+
}
1641+
});
1642+
1643+
if (!toFix.isEmpty()) {
1644+
this.logger.debug(() ->
1645+
String.format("Fixing TX offsets for %d partitions: %s", toFix.size(), toFix));
1646+
if (this.kafkaTxManager == null) {
1647+
commitOffsets(toFix);
1648+
}
1649+
else {
1650+
Objects.requireNonNull(this.transactionTemplate)
1651+
.executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix));
1652+
}
16491653
}
16501654
}
1655+
catch (Exception e) {
1656+
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1657+
+ ListenerConsumer.this.lastCommits);
1658+
}
1659+
finally {
1660+
ListenerConsumer.this.lastCommits.clear();
1661+
this.lastRecords = null;
1662+
}
16511663
}
16521664

16531665
private @Nullable ConsumerRecords<K, V> doPoll() {

0 commit comments

Comments
 (0)