Skip to content

Commit d48bcd4

Browse files
committed
refactor: use ConsumerRecords#nextOffsets in fixTxOffsets
Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent 28ef2da commit d48bcd4

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1630,7 +1630,7 @@ private void fixTxOffsetsIfNeeded() {
16301630
nextOffsets.forEach((tp, nextOffset) -> {
16311631
OffsetAndMetadata committed = this.lastCommits.get(tp);
16321632

1633-
if(committed == null) {
1633+
if (committed == null) {
16341634
this.logger.debug(() -> "No committed offset for " + tp + "; skipping TX offset fix.");
16351635
return;
16361636
}
@@ -1645,7 +1645,8 @@ private void fixTxOffsetsIfNeeded() {
16451645
String.format("Fixing TX offsets for %d partitions: %s", toFix.size(), toFix));
16461646
if (this.kafkaTxManager == null) {
16471647
commitOffsets(toFix);
1648-
} else {
1648+
}
1649+
else {
16491650
Objects.requireNonNull(this.transactionTemplate)
16501651
.executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix));
16511652
}

0 commit comments

Comments
 (0)