2525import java .util .concurrent .CompletableFuture ;
2626import java .util .concurrent .ExecutionException ;
2727
28+ import org .apache .kafka .clients .admin .AdminClient ;
29+ import org .apache .kafka .clients .admin .AdminClientConfig ;
2830import org .apache .kafka .clients .admin .NewTopic ;
2931import org .apache .kafka .clients .consumer .ConsumerConfig ;
3032import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
3638import org .junit .jupiter .api .extension .ExtendWith ;
3739
3840import org .springframework .batch .item .ExecutionContext ;
39- import org .springframework .beans .factory .annotation .Autowired ;
4041import org .springframework .kafka .core .DefaultKafkaProducerFactory ;
4142import org .springframework .kafka .core .KafkaTemplate ;
4243import org .springframework .kafka .core .ProducerFactory ;
43- import org .springframework .kafka .test .EmbeddedKafkaBroker ;
44- import org .springframework .kafka .test .context .EmbeddedKafka ;
4544import org .springframework .kafka .test .utils .KafkaTestUtils ;
4645import org .springframework .test .context .junit .jupiter .SpringExtension ;
46+ import org .testcontainers .junit .jupiter .Container ;
47+ import org .testcontainers .junit .jupiter .Testcontainers ;
48+ import org .testcontainers .kafka .KafkaContainer ;
49+ import org .testcontainers .utility .DockerImageName ;
4750
4851import static org .hamcrest .MatcherAssert .assertThat ;
4952import static org .hamcrest .Matchers .containsInAnyOrder ;
5457/**
5558 * @author Mathieu Ouellet
5659 * @author Mahmoud Ben Hassine
60+ * @author François Martin
61+ * @author Patrick Baumgartner
5762 */
58- @ EmbeddedKafka
63+ @ Testcontainers ( disabledWithoutDocker = true )
5964@ ExtendWith (SpringExtension .class )
6065class KafkaItemReaderIntegrationTests {
6166
62- @ Autowired
63- private EmbeddedKafkaBroker embeddedKafka ;
67+ private static final DockerImageName KAFKA_IMAGE = DockerImageName .parse ("apache/kafka:3.9.1" );
68+
69+ @ Container
70+ public static KafkaContainer kafka = new KafkaContainer (KAFKA_IMAGE );
6471
6572 private KafkaItemReader <String , String > reader ;
6673
@@ -69,21 +76,24 @@ class KafkaItemReaderIntegrationTests {
6976 private Properties consumerProperties ;
7077
7178 @ BeforeAll
72- static void setUpTopics (@ Autowired EmbeddedKafkaBroker embeddedKafka ) {
73- embeddedKafka .addTopics (new NewTopic ("topic1" , 1 , (short ) 1 ), new NewTopic ("topic2" , 2 , (short ) 1 ),
74- new NewTopic ("topic3" , 1 , (short ) 1 ), new NewTopic ("topic4" , 2 , (short ) 1 ),
75- new NewTopic ("topic5" , 1 , (short ) 1 ), new NewTopic ("topic6" , 1 , (short ) 1 ));
79+ static void setUpTopics () {
80+ Properties properties = new Properties ();
81+ properties .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , kafka .getBootstrapServers ());
82+ try (AdminClient adminClient = AdminClient .create (properties )) {
83+ adminClient .createTopics (List .of (new NewTopic ("topic1" , 1 , (short ) 1 ), new NewTopic ("topic2" , 2 , (short ) 1 ),
84+ new NewTopic ("topic3" , 1 , (short ) 1 ), new NewTopic ("topic4" , 2 , (short ) 1 ),
85+ new NewTopic ("topic5" , 1 , (short ) 1 ), new NewTopic ("topic6" , 1 , (short ) 1 )));
86+ }
7687 }
7788
7889 @ BeforeEach
7990 void setUp () {
80- Map <String , Object > producerProperties = KafkaTestUtils .producerProps (embeddedKafka );
91+ Map <String , Object > producerProperties = KafkaTestUtils .producerProps (kafka . getBootstrapServers () );
8192 ProducerFactory <String , String > producerFactory = new DefaultKafkaProducerFactory <>(producerProperties );
8293 this .template = new KafkaTemplate <>(producerFactory );
8394
8495 this .consumerProperties = new Properties ();
85- this .consumerProperties .setProperty (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
86- embeddedKafka .getBrokersAsString ());
96+ this .consumerProperties .setProperty (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafka .getBootstrapServers ());
8797 this .consumerProperties .setProperty (ConsumerConfig .GROUP_ID_CONFIG , "1" );
8898 this .consumerProperties .setProperty (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG ,
8999 StringDeserializer .class .getName ());
@@ -186,8 +196,8 @@ void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
186196 this .reader .close ();
187197
188198 // The offset stored in Kafka should be equal to 2 at this point
189- OffsetAndMetadata currentOffset = KafkaTestUtils .getCurrentOffset (embeddedKafka . getBrokersAsString (), "1" ,
190- "topic6" , 0 );
199+ OffsetAndMetadata currentOffset = KafkaTestUtils .getCurrentOffset (kafka . getBootstrapServers (), "1" , "topic6 " ,
200+ 0 );
191201 assertEquals (2 , currentOffset .offset ());
192202
193203 // second run (with same consumer group ID): new messages arrived since the last
0 commit comments