@@ -21,11 +21,11 @@ package org.jetbrains.kotlinx.spark.api
2121
2222import io.kotest.core.Tag
2323import io.kotest.core.extensions.install
24- import io.kotest.core.spec.style.ShouldSpec
24+ import io.kotest.core.spec.style.FunSpec
2525import io.kotest.extensions.testcontainers.TestContainerExtension
26- import io.kotest.extensions.testcontainers.kafka.createProducer
26+ import io.kotest.extensions.testcontainers.kafka.createStringStringConsumer
2727import io.kotest.extensions.testcontainers.kafka.createStringStringProducer
28- import io.kotest.matchers.collections.shouldBeIn
28+ import io.kotest.matchers.collections.shouldContain
2929import org.apache.kafka.clients.consumer.ConsumerConfig
3030import org.apache.kafka.clients.consumer.ConsumerRecord
3131import org.apache.kafka.clients.producer.ProducerRecord
@@ -38,37 +38,36 @@ import org.apache.spark.streaming.kafka010.LocationStrategies
3838import org.jetbrains.kotlinx.spark.api.tuples.*
3939import org.testcontainers.containers.KafkaContainer
4040import org.testcontainers.utility.DockerImageName
41+ import scala.Tuple3
4142import java.io.Serializable
42- import java.util.concurrent.TimeUnit
43+ import java.time.Duration
4344
4445object Kafka : Tag()
4546
46- class KafkaStreamingTest : ShouldSpec ({
47-
48- tags(Kafka )
49- val kafka = install(TestContainerExtension (KafkaContainer (DockerImageName .parse("confluentinc/cp-kafka:6.2.4")))) {
50- withEmbeddedZookeeper()
51- withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE ", "true")
52- }
47+ class KafkaStreamingTest : FunSpec () {
48+ init {
5349
50+ tags(Kafka )
5451
55- context("kafka") {
56- val topic1 = " test1"
57- val topic2 = " test2"
52+ val kafka =
53+ install(TestContainerExtension (KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.0.1" )))) {
54+ withEmbeddedZookeeper()
55+ // withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
56+ }
57+ println (kafka.bootstrapServers)
58+ test(" Streaming should support kafka" ) {
59+ val topic1 = " test1"
60+ val topic2 = " test2"
5861
59- should("support kafka streams") {
60- val producer = kafka.createStringStringProducer()
61- listOf(
62- producer.send(ProducerRecord (topic1, "Hello this is a test test test")),
63- producer.send(ProducerRecord (topic2, "This is also also a test test something")),
64- )
65- .map { it.get(10, TimeUnit .SECONDS ) }
66- producer.close()
62+ val producer = autoClose(kafka.createStringStringProducer())
63+ producer.send(ProducerRecord (topic1, " Hello this is a test test test" ))
64+ producer.send(ProducerRecord (topic2, " This is also also a test test something" ))
6765
6866 withSparkStreaming(
69- batchDuration = Durations .seconds(2 ),
67+ batchDuration = Durations .milliseconds( 1000 ),
7068 appName = " KotlinDirectKafkaWordCount" ,
71- timeout = 1000L,
69+ timeout = 10000L ,
70+ master = " local"
7271 ) {
7372
7473 val kafkaParams: Map <String , Serializable > = mapOf (
@@ -77,19 +76,18 @@ class KafkaStreamingTest : ShouldSpec({
7776 ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
7877 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
7978 )
80-
8179 // Create direct kafka stream with brokers and topics
8280 val messages: JavaInputDStream <ConsumerRecord <String , String >> = KafkaUtils .createDirectStream(
8381 ssc,
84- LocationStrategies .PreferConsistent (),
82+ LocationStrategies .PreferBrokers (),
8583 ConsumerStrategies .Subscribe (setOf (topic1, topic2), kafkaParams),
8684 )
8785
8886 // Get the lines, split them into words, count the words and print
89- val lines = messages.map { it.topic() X it.value() }
90- val words = lines.flatMapValues { it.split(" ").iterator() }
9187
92- val wordCounts = words
88+ val wordCounts = messages
89+ .map { it.topic() X it.value() }
90+ .flatMapValues { it.split(" " ).iterator() }
9391 .map { t(it, 1 ) }
9492 .reduceByKey { a: Int , b: Int -> a + b }
9593 .map { (tup, counter) -> tup + counter }
@@ -111,16 +109,18 @@ class KafkaStreamingTest : ShouldSpec({
111109 " something" X 1 ,
112110 )
113111 )
114-
112+ val data = arrayListOf< List < Tuple3 < String , String , Int >>>()
115113 wordCounts.foreachRDD { rdd, _ ->
116- rdd.foreach { (topic, word, count) ->
117- t(word, count).shouldBeIn(collection = resultLists[topic]!!)
114+ data.add(rdd.collect())
115+ }
116+ ssc.awaitTerminationOrTimeout(10000 )
117+ resultLists.forEach { (topic, tuples) ->
118+ tuples.forEach { (word, count) ->
119+ data shouldContain t(topic, word, count)
118120 }
119121 }
120-
121- wordCounts.print()
122122 }
123- }
124123
124+ }
125125 }
126- })
126+ }
0 commit comments