77 * Licensed under the Apache License, Version 2.0 (the "License");
88 * you may not use this file except in compliance with the License.
99 * You may obtain a copy of the License at
10- *
10+ *
1111 * http://www.apache.org/licenses/LICENSE-2.0
12- *
12+ *
1313 * Unless required by applicable law or agreed to in writing, software
1414 * distributed under the License is distributed on an "AS IS" BASIS,
1515 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2020package org.jetbrains.kotlinx.spark.api
2121
2222import io.kotest.core.Tag
23- import io.kotest.core.spec.style.ShouldSpec
24- import io.kotest.matchers.collections.shouldBeIn
23+ import io.kotest.core.extensions.install
24+ import io.kotest.core.spec.style.FunSpec
25+ import io.kotest.extensions.testcontainers.TestContainerExtension
26+ import io.kotest.extensions.testcontainers.kafka.createStringStringConsumer
27+ import io.kotest.extensions.testcontainers.kafka.createStringStringProducer
28+ import io.kotest.matchers.collections.shouldContain
29+ import io.kotest.matchers.collections.shouldContainAll
2530import org.apache.kafka.clients.consumer.ConsumerConfig
2631import org.apache.kafka.clients.consumer.ConsumerRecord
2732import org.apache.kafka.clients.producer.ProducerRecord
@@ -32,85 +37,94 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies
3237import org.apache.spark.streaming.kafka010.KafkaUtils
3338import org.apache.spark.streaming.kafka010.LocationStrategies
3439import org.jetbrains.kotlinx.spark.api.tuples.*
40+ import org.testcontainers.containers.KafkaContainer
41+ import org.testcontainers.utility.DockerImageName
42+ import scala.Tuple3
3543import java.io.Serializable
44+ import java.time.Duration
3645
3746object Kafka : Tag()
3847
39- class KafkaStreamingTest : ShouldSpec ({
48+ class KafkaStreamingTest : FunSpec () {
49+ init {
4050
41- // making sure it can be skipped on Github actions since it times out
42- tags(Kafka )
51+ tags(Kafka )
4352
44- context("kafka") {
45- val port = 9092
46- val broker = " localhost:$port "
47- val topic1 = " test1"
48- val topic2 = " test2"
49- val kafkaListener = EmbeddedKafkaListener (port)
50- listener(kafkaListener)
53+ val kafka = install(
54+ TestContainerExtension (KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.0.1" )))
55+ ) {
56+ withEmbeddedZookeeper()
57+ withEnv(" KAFKA_AUTO_CREATE_TOPICS_ENABLE" , " true" )
58+ }
59+ println (kafka.bootstrapServers)
60+ test(" Streaming should support kafka" ) {
61+ val topic1 = " test1"
62+ val topic2 = " test2"
5163
52- should("support kafka streams") {
53- val producer = kafkaListener.stringStringProducer()
54- producer.send(ProducerRecord (topic1, "Hello this is a test test test"))
55- producer.send(ProducerRecord (topic2, "This is also also a test test something"))
56- producer.close()
64+ val resultLists = mapOf (
65+ topic1 to listOf (
66+ " Hello" X 1 ,
67+ " this" X 1 ,
68+ " is" X 1 ,
69+ " a" X 1 ,
70+ " test" X 3 ,
71+ ),
72+ topic2 to listOf (
73+ " This" X 1 ,
74+ " is" X 1 ,
75+ " also" X 2 ,
76+ " a" X 1 ,
77+ " test" X 2 ,
78+ " something" X 1 ,
79+ )
80+ )
81+ val data = arrayListOf<List <Tuple3 <String , String , Int >>>()
5782
5883 withSparkStreaming(
59- batchDuration = Durations .seconds(2 ),
84+ batchDuration = Durations .milliseconds( 1000 ),
6085 appName = " KotlinDirectKafkaWordCount" ,
61- timeout = 1000L,
86+ timeout = 10_000L ,
87+ master = " local"
6288 ) {
6389
90+ setRunAfterStart {
91+ val producer = autoClose(kafka.createStringStringProducer())
92+ producer.send(ProducerRecord (topic1, " Hello this is a test test test" ))
93+ producer.send(ProducerRecord (topic2, " This is also also a test test something" ))
94+ }
95+
6496 val kafkaParams: Map <String , Serializable > = mapOf (
65- ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to broker ,
97+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to " ${kafka.host} : ${kafka.getMappedPort( KafkaContainer . KAFKA_PORT )} " ,
6698 ConsumerConfig .GROUP_ID_CONFIG to " consumer-group" ,
6799 ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
68100 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
69101 )
70-
71102 // Create direct kafka stream with brokers and topics
72103 val messages: JavaInputDStream <ConsumerRecord <String , String >> = KafkaUtils .createDirectStream(
73104 ssc,
74- LocationStrategies .PreferConsistent (),
105+ LocationStrategies .PreferBrokers (),
75106 ConsumerStrategies .Subscribe (setOf (topic1, topic2), kafkaParams),
76107 )
77108
78109 // Get the lines, split them into words, count the words and print
79- val lines = messages.map { it.topic() X it.value() }
80- val words = lines.flatMapValues { it.split(" ").iterator() }
81110
82- val wordCounts = words
111+ val wordCounts = messages
112+ .map { it.topic() X it.value() }
113+ .flatMapValues { it.split(" " ).iterator() }
83114 .map { t(it, 1 ) }
84115 .reduceByKey { a: Int , b: Int -> a + b }
85116 .map { (tup, counter) -> tup + counter }
86117
87- val resultLists = mapOf(
88- topic1 to listOf(
89- "Hello " X 1,
90- "this" X 1,
91- "is" X 1,
92- "a" X 1,
93- "test" X 3,
94- ),
95- topic2 to listOf(
96- "This " X 1,
97- "is" X 1,
98- "also" X 2,
99- "a" X 1,
100- "test" X 2,
101- "something" X 1,
102- )
103- )
104118
105119 wordCounts.foreachRDD { rdd, _ ->
106- rdd.foreach { (topic, word, count) ->
107- t(word, count).shouldBeIn(collection = resultLists[topic]!!)
108- }
120+ data.add(rdd.collect())
109121 }
122+ }
110123
111- wordCounts.print()
124+ val resultList = resultLists.flatMap { (topic, tuples) ->
125+ tuples.map { it.prependedBy(topic) }
112126 }
127+ data.flatten() shouldContainAll resultList
113128 }
114-
115129 }
116- })
130+ }
0 commit comments