@@ -26,6 +26,7 @@ import io.kotest.extensions.testcontainers.TestContainerExtension
2626import io.kotest.extensions.testcontainers.kafka.createStringStringConsumer
2727import io.kotest.extensions.testcontainers.kafka.createStringStringProducer
2828import io.kotest.matchers.collections.shouldContain
29+ import io.kotest.matchers.collections.shouldContainAll
2930import org.apache.kafka.clients.consumer.ConsumerConfig
3031import org.apache.kafka.clients.consumer.ConsumerRecord
3132import org.apache.kafka.clients.producer.ProducerRecord
@@ -49,29 +50,51 @@ class KafkaStreamingTest : FunSpec() {
4950
5051 tags(Kafka )
5152
52- val kafka =
53- install(TestContainerExtension (KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.0.1" )))) {
54- withEmbeddedZookeeper()
55- // withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
56- }
53+ val kafka = install(
54+ TestContainerExtension (KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.0.1" )))
55+ ) {
56+ withEmbeddedZookeeper()
57+ withEnv(" KAFKA_AUTO_CREATE_TOPICS_ENABLE" , " true" )
58+ }
5759 println (kafka.bootstrapServers)
5860 test(" Streaming should support kafka" ) {
5961 val topic1 = " test1"
6062 val topic2 = " test2"
6163
62- val producer = autoClose(kafka.createStringStringProducer())
63- producer.send(ProducerRecord (topic1, " Hello this is a test test test" ))
64- producer.send(ProducerRecord (topic2, " This is also also a test test something" ))
64+ val resultLists = mapOf (
65+ topic1 to listOf (
66+ " Hello" X 1 ,
67+ " this" X 1 ,
68+ " is" X 1 ,
69+ " a" X 1 ,
70+ " test" X 3 ,
71+ ),
72+ topic2 to listOf (
73+ " This" X 1 ,
74+ " is" X 1 ,
75+ " also" X 2 ,
76+ " a" X 1 ,
77+ " test" X 2 ,
78+ " something" X 1 ,
79+ )
80+ )
81+ val data = arrayListOf<List <Tuple3 <String , String , Int >>>()
6582
6683 withSparkStreaming(
6784 batchDuration = Durations .milliseconds(1000 ),
6885 appName = " KotlinDirectKafkaWordCount" ,
69- timeout = 10000L ,
86+ timeout = 10_000L ,
7087 master = " local"
7188 ) {
7289
90+ setRunAfterStart {
91+ val producer = autoClose(kafka.createStringStringProducer())
92+ producer.send(ProducerRecord (topic1, " Hello this is a test test test" ))
93+ producer.send(ProducerRecord (topic2, " This is also also a test test something" ))
94+ }
95+
7396 val kafkaParams: Map <String , Serializable > = mapOf (
74- ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers ,
97+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to " ${ kafka.host} : ${kafka.getMappedPort( KafkaContainer . KAFKA_PORT )} " ,
7598 ConsumerConfig .GROUP_ID_CONFIG to " consumer-group" ,
7699 ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
77100 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
@@ -92,35 +115,16 @@ class KafkaStreamingTest : FunSpec() {
92115 .reduceByKey { a: Int , b: Int -> a + b }
93116 .map { (tup, counter) -> tup + counter }
94117
95- val resultLists = mapOf (
96- topic1 to listOf (
97- " Hello" X 1 ,
98- " this" X 1 ,
99- " is" X 1 ,
100- " a" X 1 ,
101- " test" X 3 ,
102- ),
103- topic2 to listOf (
104- " This" X 1 ,
105- " is" X 1 ,
106- " also" X 2 ,
107- " a" X 1 ,
108- " test" X 2 ,
109- " something" X 1 ,
110- )
111- )
112- val data = arrayListOf<List <Tuple3 <String , String , Int >>>()
118+
113119 wordCounts.foreachRDD { rdd, _ ->
114120 data.add(rdd.collect())
115121 }
116- ssc.awaitTerminationOrTimeout(10000 )
117- resultLists.forEach { (topic, tuples) ->
118- tuples.forEach { (word, count) ->
119- data shouldContain t(topic, word, count)
120- }
121- }
122122 }
123123
124+ val resultList = resultLists.flatMap { (topic, tuples) ->
125+ tuples.map { it.prependedBy(topic) }
126+ }
127+ data.flatten() shouldContainAll resultList
124128 }
125129 }
126130}
0 commit comments