@@ -1232,4 +1232,69 @@ public void afterRecord(
12321232 container .stop ();
12331233 pf .destroy ();
12341234 }
1235+
1236+ @ SuppressWarnings ("unchecked" )
1237+ @ Test
1238+ void testFixTxOffsetsWithReadUncommitted () throws Exception {
1239+ Map <String , Object > props = KafkaTestUtils .consumerProps (embeddedKafka , "txReadUncommittedTest" , false );
1240+ props .put (ConsumerConfig .ISOLATION_LEVEL_CONFIG , "read_uncommitted" );
1241+ DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
1242+ ContainerProperties containerProps = new ContainerProperties (topic7 );
1243+ containerProps .setGroupId ("txReadUncommittedTest" );
1244+ containerProps .setPollTimeout (500L );
1245+ containerProps .setIdleEventInterval (500L );
1246+ containerProps .setFixTxOffsets (true );
1247+
1248+ Map <String , Object > senderProps = KafkaTestUtils .producerProps (embeddedKafka );
1249+ DefaultKafkaProducerFactory <Integer , String > pf = new DefaultKafkaProducerFactory <>(senderProps );
1250+ pf .setTransactionIdPrefix ("readUncommitted." );
1251+
1252+ final KafkaTemplate <Integer , String > template = new KafkaTemplate <>(pf );
1253+ final AtomicInteger messageCount = new AtomicInteger ();
1254+ final CountDownLatch latch = new CountDownLatch (3 );
1255+ containerProps .setMessageListener ((MessageListener <Integer , String >) message -> {
1256+ messageCount .incrementAndGet ();
1257+ latch .countDown ();
1258+ });
1259+
1260+ @ SuppressWarnings ({ "rawtypes" })
1261+ KafkaTransactionManager tm = new KafkaTransactionManager (pf );
1262+ containerProps .setKafkaAwareTransactionManager (tm );
1263+ KafkaMessageListenerContainer <Integer , String > container =
1264+ new KafkaMessageListenerContainer <>(cf , containerProps );
1265+ container .setBeanName ("testFixTxOffsetsWithReadUncommitted" );
1266+
1267+ AtomicReference <Map <TopicPartition , OffsetAndMetadata >> committed = new AtomicReference <>();
1268+ CountDownLatch idleLatch = new CountDownLatch (1 );
1269+ container .setApplicationEventPublisher (event -> {
1270+ if (event instanceof ListenerContainerIdleEvent ) {
1271+ Consumer <?, ?> consumer = ((ListenerContainerIdleEvent ) event ).getConsumer ();
1272+ committed .set (consumer .committed (
1273+ Collections .singleton (new TopicPartition (topic7 , 0 ))));
1274+ idleLatch .countDown ();
1275+ }
1276+ });
1277+
1278+ container .start ();
1279+
1280+ template .setDefaultTopic (topic7 );
1281+ template .executeInTransaction (t -> {
1282+ template .sendDefault (0 , 0 , "msg1" );
1283+ template .sendDefault (0 , 0 , "msg2" );
1284+ template .sendDefault (0 , 0 , "msg3" );
1285+ return null ;
1286+ });
1287+
1288+ assertThat (latch .await (60 , TimeUnit .SECONDS )).isTrue ();
1289+ assertThat (idleLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
1290+
1291+ assertThat (messageCount .get ()).isGreaterThanOrEqualTo (3 );
1292+ TopicPartition partition0 = new TopicPartition (topic7 , 0 );
1293+ assertThat (committed .get ().get (partition0 )).isNotNull ();
1294+ assertThat (committed .get ().get (partition0 ).offset ()).isGreaterThanOrEqualTo (3L );
1295+
1296+ container .stop ();
1297+ pf .destroy ();
1298+ }
1299+
12351300}
0 commit comments