Skip to content

Commit 63961e4

Browse files
committed
test : add test
Signed-off-by: Su Ko <rhtn1128@gmail.com>
1 parent f75b4b8 commit 63961e4

File tree

2 files changed

+171
-16
lines changed

2 files changed

+171
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,9 +1534,10 @@ private void doProcessCommits() {
15341534
}
15351535

15361536
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
1537+
saveLastRecordsIfNeeded(records);
1538+
15371539
if (records != null && records.count() > 0) {
15381540
this.receivedSome = true;
1539-
saveLastRecordsIfNeeded(records);
15401541
notIdle();
15411542
notIdlePartitions(records.partitions());
15421543
invokeListener(records);
@@ -1608,8 +1609,8 @@ private void notIdle() {
16081609
}
16091610
}
16101611

1611-
private void saveLastRecordsIfNeeded(ConsumerRecords<K, V> records) {
1612-
if (this.fixTxOffsets) {
1612+
private void saveLastRecordsIfNeeded(@Nullable ConsumerRecords<K, V> records) {
1613+
if (this.fixTxOffsets && records != null && !records.nextOffsets().isEmpty()) {
16131614
this.lastRecords = records;
16141615
}
16151616
}
@@ -1647,7 +1648,7 @@ private void fixTxOffsetsIfNeeded() {
16471648
}
16481649

16491650
// Only fix if we have processed records and the next offset is greater than committed
1650-
if (!this.lastRecords.records(tp).isEmpty() && nextOffset.offset() > committed.offset()) {
1651+
if (this.lastRecords.nextOffsets().get(tp) != null && nextOffset.offset() > committed.offset()) {
16511652
toFix.put(tp, nextOffset);
16521653
}
16531654
});
@@ -1662,16 +1663,15 @@ private void fixTxOffsetsIfNeeded() {
16621663
Objects.requireNonNull(this.transactionTemplate)
16631664
.executeWithoutResult(status -> doSendOffsets(getTxProducer(), toFix));
16641665
}
1666+
1667+
ListenerConsumer.this.lastCommits.clear();
1668+
this.lastRecords = null;
16651669
}
16661670
}
16671671
catch (Exception e) {
16681672
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
16691673
+ ListenerConsumer.this.lastCommits);
16701674
}
1671-
finally {
1672-
ListenerConsumer.this.lastCommits.clear();
1673-
this.lastRecords = null;
1674-
}
16751675
}
16761676

16771677
private @Nullable ConsumerRecords<K, V> doPoll() {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 163 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@
116116
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
117117
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
118118
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
119-
TransactionalContainerTests.topic10},
119+
TransactionalContainerTests.topic10, TransactionalContainerTests.topic11, TransactionalContainerTests.topic12,
120+
TransactionalContainerTests.topic13, TransactionalContainerTests.topic14, TransactionalContainerTests.topic15
121+
},
120122
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
121123
public class TransactionalContainerTests {
122124

@@ -146,6 +148,15 @@ public class TransactionalContainerTests {
146148

147149
public static final String topic11 = "txTopic11";
148150

151+
public static final String topic12 = "txTopic12";
152+
153+
public static final String topic13 = "txTopic13";
154+
155+
public static final String topic14 = "txTopic14";
156+
157+
public static final String topic15 = "txTopic15";
158+
159+
149160
private static EmbeddedKafkaBroker embeddedKafka;
150161

151162
@BeforeAll
@@ -1239,7 +1250,7 @@ void testFixTxOffsetsWithReadUncommitted() throws Exception {
12391250
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txReadUncommittedTest", false);
12401251
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
12411252
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1242-
ContainerProperties containerProps = new ContainerProperties(topic7);
1253+
ContainerProperties containerProps = new ContainerProperties(topic12);
12431254
containerProps.setGroupId("txReadUncommittedTest");
12441255
containerProps.setPollTimeout(500L);
12451256
containerProps.setIdleEventInterval(500L);
@@ -1251,7 +1262,7 @@ void testFixTxOffsetsWithReadUncommitted() throws Exception {
12511262

12521263
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
12531264
final AtomicInteger messageCount = new AtomicInteger();
1254-
final CountDownLatch latch = new CountDownLatch(3);
1265+
final CountDownLatch latch = new CountDownLatch(1);
12551266
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
12561267
messageCount.incrementAndGet();
12571268
latch.countDown();
@@ -1270,14 +1281,14 @@ void testFixTxOffsetsWithReadUncommitted() throws Exception {
12701281
if (event instanceof ListenerContainerIdleEvent) {
12711282
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
12721283
committed.set(consumer.committed(
1273-
Collections.singleton(new TopicPartition(topic7, 0))));
1284+
Collections.singleton(new TopicPartition(topic12, 0))));
12741285
idleLatch.countDown();
12751286
}
12761287
});
12771288

12781289
container.start();
12791290

1280-
template.setDefaultTopic(topic7);
1291+
template.setDefaultTopic(topic12);
12811292
template.executeInTransaction(t -> {
12821293
template.sendDefault(0, 0, "msg1");
12831294
template.sendDefault(0, 0, "msg2");
@@ -1286,15 +1297,159 @@ void testFixTxOffsetsWithReadUncommitted() throws Exception {
12861297
});
12871298

12881299
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1289-
assertThat(idleLatch.await(10, TimeUnit.SECONDS)).isTrue();
1300+
assertThat(idleLatch.await(60, TimeUnit.SECONDS)).isTrue();
12901301

12911302
assertThat(messageCount.get()).isGreaterThanOrEqualTo(3);
1292-
TopicPartition partition0 = new TopicPartition(topic7, 0);
1303+
TopicPartition partition0 = new TopicPartition(topic12, 0);
12931304
assertThat(committed.get().get(partition0)).isNotNull();
1294-
assertThat(committed.get().get(partition0).offset()).isGreaterThanOrEqualTo(3L);
1305+
1306+
// 0 1 2 3(tx marker) => next offset 4
1307+
assertThat(committed.get().get(partition0).offset()).isGreaterThanOrEqualTo(4L);
12951308

12961309
container.stop();
12971310
pf.destroy();
12981311
}
12991312

1313+
@Test
1314+
void testFixTxOffsetsWithEmptyPollAdvance() throws Exception {
1315+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txEmptyPoll", false);
1316+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1317+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
1318+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1319+
1320+
ContainerProperties containerProps = new ContainerProperties(topic13);
1321+
containerProps.setGroupId("txEmptyPoll");
1322+
containerProps.setPollTimeout(500L);
1323+
containerProps.setFixTxOffsets(true);
1324+
containerProps.setIdleEventInterval(1000L);
1325+
1326+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1327+
1328+
DefaultKafkaProducerFactory<Integer, String> pf =
1329+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1330+
pf.setTransactionIdPrefix("tx.emptyPoll.");
1331+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1332+
1333+
KafkaMessageListenerContainer<Integer, String> container =
1334+
new KafkaMessageListenerContainer<>(cf, containerProps);
1335+
container.setBeanName("testFixEmptyPoll");
1336+
1337+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
1338+
CountDownLatch latch = new CountDownLatch(1);
1339+
1340+
container.setApplicationEventPublisher(event -> {
1341+
if (event instanceof ListenerContainerIdleEvent e) {
1342+
TopicPartition tp = new TopicPartition(topic13, 0);
1343+
committed.set(e.getConsumer().committed(Set.of(tp)));
1344+
latch.countDown();
1345+
}
1346+
});
1347+
1348+
container.start();
1349+
1350+
template.setDefaultTopic(topic13);
1351+
template.executeInTransaction(t -> {
1352+
template.sendDefault(0, 0, "msg1");
1353+
template.sendDefault(0, 0, "msg2");
1354+
template.sendDefault(0, 0, "msg3");
1355+
return null;
1356+
});
1357+
1358+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1359+
assertThat(committed.get().get(new TopicPartition(topic13, 0)).offset())
1360+
.isEqualTo(4L);
1361+
container.stop();
1362+
}
1363+
1364+
@Test
1365+
void testFixTxOffsetsRetainsLeaderEpoch() throws Exception {
1366+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txLeaderEpoch", false);
1367+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1368+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1369+
1370+
ContainerProperties containerProps = new ContainerProperties(topic14);
1371+
containerProps.setFixTxOffsets(true);
1372+
containerProps.setIdleEventInterval(1000L);
1373+
1374+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1375+
1376+
DefaultKafkaProducerFactory<Integer, String> pf =
1377+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1378+
pf.setTransactionIdPrefix("tx.leaderEpoch.");
1379+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1380+
1381+
KafkaMessageListenerContainer<Integer, String> container =
1382+
new KafkaMessageListenerContainer<>(cf, containerProps);
1383+
1384+
AtomicReference<OffsetAndMetadata> committed = new AtomicReference<>();
1385+
CountDownLatch latch = new CountDownLatch(1);
1386+
1387+
container.setApplicationEventPublisher(event -> {
1388+
if (event instanceof ListenerContainerIdleEvent e) {
1389+
TopicPartition tp = new TopicPartition(topic14, 0);
1390+
committed.set(e.getConsumer().committed(Set.of(tp)).get(tp));
1391+
latch.countDown();
1392+
}
1393+
});
1394+
1395+
container.start();
1396+
1397+
template.setDefaultTopic(topic14);
1398+
template.executeInTransaction(t -> {
1399+
template.sendDefault(0, 0, "data");
1400+
return null;
1401+
});
1402+
1403+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
1404+
assertThat(committed.get().leaderEpoch()).isNotNull();
1405+
container.stop();
1406+
}
1407+
1408+
@Test
1409+
void testFixLagWhenMaxPollEqualsTxBatchSize() throws Exception {
1410+
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "txTestPollLimit", false);
1411+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1412+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
1413+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1414+
1415+
ContainerProperties containerProps = new ContainerProperties(topic15);
1416+
containerProps.setGroupId("txTestPollLimit");
1417+
containerProps.setPollTimeout(500L);
1418+
containerProps.setFixTxOffsets(true);
1419+
containerProps.setIdleEventInterval(1000L);
1420+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1421+
1422+
DefaultKafkaProducerFactory<Integer, String> pf =
1423+
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
1424+
pf.setTransactionIdPrefix("tx.polllimit.");
1425+
1426+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1427+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1428+
container.setBeanName("testFixLagPollLimit");
1429+
1430+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
1431+
CountDownLatch latch = new CountDownLatch(1);
1432+
1433+
container.setApplicationEventPublisher(event -> {
1434+
if (event instanceof ListenerContainerIdleEvent e) {
1435+
committed.set(e.getConsumer().committed(Set.of(new TopicPartition(topic15, 0))));
1436+
latch.countDown();
1437+
}
1438+
});
1439+
1440+
container.start();
1441+
1442+
template.setDefaultTopic(topic15);
1443+
template.executeInTransaction(t -> {
1444+
template.sendDefault(0, 0, "msg1");
1445+
template.sendDefault(0, 0, "msg2");
1446+
template.sendDefault(0, 0, "msg3");
1447+
return null;
1448+
});
1449+
1450+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
1451+
assertThat(committed.get().get(new TopicPartition(topic15, 0)).offset()).isEqualTo(4L);
1452+
container.stop();
1453+
}
1454+
13001455
}

0 commit comments

Comments
 (0)