Skip to content

Commit 1abc7c1

Browse files
committed
Refactor fixTxOffsets to use ConsumerRecords#nextOffsets
Replaced the position()-based offset correction with nextOffsets() to address known issues with transactional topics: - Works correctly with both read_committed and read_uncommitted - Handles empty poll() advancement and leader epoch propagation - Fixes the case when max.poll.records equals transaction batch size Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent ec7aeb4 commit 1abc7c1

File tree

2 files changed

+282
-38
lines changed

2 files changed

+282
-38
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
* @author Christian Fredriksson
173173
* @author Timofey Barabanov
174174
* @author Janek Lasocki-Biczysko
175+
* @author Su Ko
175176
*/
176177
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
177178
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -843,6 +844,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
843844

844845
private @Nullable ConsumerRecords<K, V> remainingRecords;
845846

847+
// use to fix tx marker
848+
private @Nullable ConsumerRecords<K, V> lastRecords;
849+
846850
private boolean pauseForPending;
847851

848852
private boolean firstPoll;
@@ -1530,9 +1534,10 @@ private void doProcessCommits() {
15301534
}
15311535

15321536
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
1537+
saveLastRecordsIfNeeded(records);
1538+
15331539
if (records != null && records.count() > 0) {
15341540
this.receivedSome = true;
1535-
savePositionsIfNeeded(records);
15361541
notIdle();
15371542
notIdlePartitions(records.partitions());
15381543
invokeListener(records);
@@ -1604,50 +1609,69 @@ private void notIdle() {
16041609
}
16051610
}
16061611

1607-
private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
1608-
if (this.fixTxOffsets) {
1609-
this.savedPositions.clear();
1610-
records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
1612+
private void saveLastRecordsIfNeeded(@Nullable ConsumerRecords<K, V> records) {
1613+
if (this.fixTxOffsets && records != null && !records.nextOffsets().isEmpty()) {
1614+
this.lastRecords = records;
16111615
}
16121616
}
16131617

1618+
/**
1619+
* Fix transactional offsets using ConsumerRecords#nextOffsets() API.
1620+
* This method addresses the issue where Kafka transaction markers can cause
1621+
* incorrect offset tracking. By using nextOffsets() instead of position(),
1622+
* we ensure:
1623+
* - Transaction markers are automatically filtered out
1624+
* - Partition-leader epoch information is correctly captured
1625+
* - Empty poll() cases are properly handled
1626+
*/
16141627
private void fixTxOffsetsIfNeeded() {
1615-
if (this.fixTxOffsets) {
1616-
try {
1617-
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
1618-
this.lastCommits.forEach((tp, oamd) -> {
1619-
long position = this.consumer.position(tp);
1620-
Long saved = this.savedPositions.get(tp);
1621-
if (saved != null && saved != position) {
1622-
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
1623-
+ "saved: " + this.savedPositions + ", "
1624-
+ "committed: " + oamd + ", "
1625-
+ "current: " + tp + "@" + position);
1626-
return;
1627-
}
1628-
if (position > oamd.offset()) {
1629-
toFix.put(tp, createOffsetAndMetadata(position));
1630-
}
1631-
});
1632-
if (!toFix.isEmpty()) {
1633-
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
1634-
if (this.kafkaTxManager == null) {
1635-
commitOffsets(toFix);
1636-
}
1637-
else {
1638-
Objects.requireNonNull(this.transactionTemplate).executeWithoutResult(
1639-
status -> doSendOffsets(getTxProducer(), toFix));
1640-
}
1641-
}
1642-
}
1643-
catch (Exception e) {
1644-
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1645-
+ ListenerConsumer.this.lastCommits);
1628+
if (!this.fixTxOffsets) {
1629+
return;
1630+
}
1631+
1632+
try {
1633+
if (this.lastRecords == null) {
1634+
this.logger.trace(() -> "No previous records available for TX offset fix.");
1635+
return;
16461636
}
1647-
finally {
1637+
1638+
Map<TopicPartition, OffsetAndMetadata> nextOffsets = this.lastRecords.nextOffsets();
1639+
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
1640+
1641+
// Fix offset only if records were actually processed
1642+
nextOffsets.forEach((tp, nextOffset) -> {
1643+
OffsetAndMetadata committed = this.lastCommits.get(tp);
1644+
1645+
if (committed == null) {
1646+
this.logger.debug(() -> "No committed offset for " + tp + "; skipping TX offset fix.");
1647+
return;
1648+
}
1649+
1650+
// Only fix if we have processed records and the next offset is greater than committed
1651+
if (this.lastRecords.nextOffsets().get(tp) != null && nextOffset.offset() > committed.offset()) {
1652+
toFix.put(tp, nextOffset);
1653+
}
1654+
});
1655+
1656+
if (!toFix.isEmpty()) {
1657+
this.logger.debug(() ->
1658+
String.format("Fixing TX offsets for %d partitions: %s", toFix.size(), toFix));
1659+
if (this.kafkaTxManager == null) {
1660+
commitOffsets(toFix);
1661+
}
1662+
else {
1663+
Objects.requireNonNull(this.transactionTemplate)
1664+
.executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix));
1665+
}
1666+
16481667
ListenerConsumer.this.lastCommits.clear();
1668+
this.lastRecords = null;
16491669
}
16501670
}
1671+
catch (Exception e) {
1672+
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1673+
+ ListenerConsumer.this.lastCommits);
1674+
}
16511675
}
16521676

16531677
private @Nullable ConsumerRecords<K, V> doPoll() {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 221 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@
116116
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
117117
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
118118
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
119-
TransactionalContainerTests.topic10},
119+
TransactionalContainerTests.topic10, TransactionalContainerTests.topic11, TransactionalContainerTests.topic12,
120+
TransactionalContainerTests.topic13, TransactionalContainerTests.topic14, TransactionalContainerTests.topic15
121+
},
120122
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
121123
public class TransactionalContainerTests {
122124

@@ -146,6 +148,15 @@ public class TransactionalContainerTests {
146148

147149
public static final String topic11 = "txTopic11";
148150

151+
public static final String topic12 = "txTopic12";
152+
153+
public static final String topic13 = "txTopic13";
154+
155+
public static final String topic14 = "txTopic14";
156+
157+
public static final String topic15 = "txTopic15";
158+
159+
149160
private static EmbeddedKafkaBroker embeddedKafka;
150161

151162
@BeforeAll
@@ -1232,4 +1243,213 @@ public void afterRecord(
12321243
container.stop();
12331244
pf.destroy();
12341245
}
1246+
1247+
@SuppressWarnings("unchecked")
1248+
@Test
1249+
void testFixTxOffsetsWithReadUncommitted() throws Exception {
1250+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txReadUncommittedTest", false);
1251+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
1252+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1253+
ContainerProperties containerProps = new ContainerProperties(topic12);
1254+
containerProps.setGroupId("txReadUncommittedTest");
1255+
containerProps.setPollTimeout(500L);
1256+
containerProps.setIdleEventInterval(500L);
1257+
containerProps.setFixTxOffsets(true);
1258+
1259+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
1260+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
1261+
pf.setTransactionIdPrefix("readUncommitted.");
1262+
1263+
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1264+
final AtomicInteger messageCount = new AtomicInteger();
1265+
final CountDownLatch latch = new CountDownLatch(1);
1266+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
1267+
messageCount.incrementAndGet();
1268+
latch.countDown();
1269+
});
1270+
1271+
@SuppressWarnings({ "rawtypes" })
1272+
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
1273+
containerProps.setKafkaAwareTransactionManager(tm);
1274+
KafkaMessageListenerContainer<Integer, String> container =
1275+
new KafkaMessageListenerContainer<>(cf, containerProps);
1276+
container.setBeanName("testFixTxOffsetsWithReadUncommitted");
1277+
1278+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
1279+
CountDownLatch idleLatch = new CountDownLatch(1);
1280+
container.setApplicationEventPublisher(event -> {
1281+
if (event instanceof ListenerContainerIdleEvent) {
1282+
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
1283+
committed.set(consumer.committed(
1284+
Collections.singleton(new TopicPartition(topic12, 0))));
1285+
idleLatch.countDown();
1286+
}
1287+
});
1288+
1289+
container.start();
1290+
1291+
template.setDefaultTopic(topic12);
1292+
template.executeInTransaction(t -> {
1293+
template.sendDefault(0, 0, "msg1");
1294+
template.sendDefault(0, 0, "msg2");
1295+
template.sendDefault(0, 0, "msg3");
1296+
return null;
1297+
});
1298+
1299+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1300+
assertThat(idleLatch.await(60, TimeUnit.SECONDS)).isTrue();
1301+
1302+
assertThat(messageCount.get()).isGreaterThanOrEqualTo(3);
1303+
TopicPartition partition0 = new TopicPartition(topic12, 0);
1304+
assertThat(committed.get().get(partition0)).isNotNull();
1305+
1306+
// 0 1 2 3(tx marker) => next offset 4
1307+
assertThat(committed.get().get(partition0).offset()).isGreaterThanOrEqualTo(4L);
1308+
1309+
container.stop();
1310+
pf.destroy();
1311+
}
1312+
1313+
@Test
1314+
void testFixTxOffsetsWithEmptyPollAdvance() throws Exception {
1315+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txEmptyPoll", false);
1316+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1317+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
1318+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1319+
1320+
ContainerProperties containerProps = new ContainerProperties(topic13);
1321+
containerProps.setGroupId("txEmptyPoll");
1322+
containerProps.setPollTimeout(500L);
1323+
containerProps.setFixTxOffsets(true);
1324+
containerProps.setIdleEventInterval(1000L);
1325+
1326+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1327+
1328+
DefaultKafkaProducerFactory<Integer, String> pf =
1329+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1330+
pf.setTransactionIdPrefix("tx.emptyPoll.");
1331+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1332+
1333+
KafkaMessageListenerContainer<Integer, String> container =
1334+
new KafkaMessageListenerContainer<>(cf, containerProps);
1335+
container.setBeanName("testFixEmptyPoll");
1336+
1337+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
1338+
CountDownLatch latch = new CountDownLatch(1);
1339+
1340+
container.setApplicationEventPublisher(event -> {
1341+
if (event instanceof ListenerContainerIdleEvent e) {
1342+
TopicPartition tp = new TopicPartition(topic13, 0);
1343+
committed.set(e.getConsumer().committed(Set.of(tp)));
1344+
latch.countDown();
1345+
}
1346+
});
1347+
1348+
container.start();
1349+
1350+
template.setDefaultTopic(topic13);
1351+
template.executeInTransaction(t -> {
1352+
template.sendDefault(0, 0, "msg1");
1353+
template.sendDefault(0, 0, "msg2");
1354+
template.sendDefault(0, 0, "msg3");
1355+
return null;
1356+
});
1357+
1358+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1359+
assertThat(committed.get().get(new TopicPartition(topic13, 0)).offset())
1360+
.isEqualTo(4L);
1361+
container.stop();
1362+
}
1363+
1364+
@Test
1365+
void testFixTxOffsetsRetainsLeaderEpoch() throws Exception {
1366+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txLeaderEpoch", false);
1367+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1368+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1369+
1370+
ContainerProperties containerProps = new ContainerProperties(topic14);
1371+
containerProps.setFixTxOffsets(true);
1372+
containerProps.setIdleEventInterval(1000L);
1373+
1374+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1375+
1376+
DefaultKafkaProducerFactory<Integer, String> pf =
1377+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1378+
pf.setTransactionIdPrefix("tx.leaderEpoch.");
1379+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1380+
1381+
KafkaMessageListenerContainer<Integer, String> container =
1382+
new KafkaMessageListenerContainer<>(cf, containerProps);
1383+
1384+
AtomicReference<OffsetAndMetadata> committed = new AtomicReference<>();
1385+
CountDownLatch latch = new CountDownLatch(1);
1386+
1387+
container.setApplicationEventPublisher(event -> {
1388+
if (event instanceof ListenerContainerIdleEvent e) {
1389+
TopicPartition tp = new TopicPartition(topic14, 0);
1390+
committed.set(e.getConsumer().committed(Set.of(tp)).get(tp));
1391+
latch.countDown();
1392+
}
1393+
});
1394+
1395+
container.start();
1396+
1397+
template.setDefaultTopic(topic14);
1398+
template.executeInTransaction(t -> {
1399+
template.sendDefault(0, 0, "data");
1400+
return null;
1401+
});
1402+
1403+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
1404+
assertThat(committed.get().leaderEpoch().isPresent()).isTrue();
1405+
container.stop();
1406+
}
1407+
1408+
@Test
1409+
void testFixLagWhenMaxPollEqualsTxBatchSize() throws Exception {
1410+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txTestPollLimit", false);
1411+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1412+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
1413+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1414+
1415+
ContainerProperties containerProps = new ContainerProperties(topic15);
1416+
containerProps.setGroupId("txTestPollLimit");
1417+
containerProps.setPollTimeout(500L);
1418+
containerProps.setFixTxOffsets(true);
1419+
containerProps.setIdleEventInterval(1000L);
1420+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1421+
1422+
DefaultKafkaProducerFactory<Integer, String> pf =
1423+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1424+
pf.setTransactionIdPrefix("tx.polllimit.");
1425+
1426+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1427+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1428+
container.setBeanName("testFixLagPollLimit");
1429+
1430+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
1431+
CountDownLatch latch = new CountDownLatch(1);
1432+
1433+
container.setApplicationEventPublisher(event -> {
1434+
if (event instanceof ListenerContainerIdleEvent e) {
1435+
committed.set(e.getConsumer().committed(Set.of(new TopicPartition(topic15, 0))));
1436+
latch.countDown();
1437+
}
1438+
});
1439+
1440+
container.start();
1441+
1442+
template.setDefaultTopic(topic15);
1443+
template.executeInTransaction(t -> {
1444+
template.sendDefault(0, 0, "msg1");
1445+
template.sendDefault(0, 0, "msg2");
1446+
template.sendDefault(0, 0, "msg3");
1447+
return null;
1448+
});
1449+
1450+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1451+
assertThat(committed.get().get(new TopicPartition(topic15, 0)).offset()).isEqualTo(4L);
1452+
container.stop();
1453+
}
1454+
12351455
}

0 commit comments

Comments
 (0)