Skip to content

Commit 54b9d10

Browse files
committed
updating readme and example
1 parent 4dfb747 commit 54b9d10

File tree

2 files changed

+52
-9
lines changed

2 files changed

+52
-9
lines changed

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
2323
- [Column infix/operator functions](#column-infixoperator-functions)
2424
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
2525
- [Tuples](#tuples)
26+
- [Streaming](#streaming)
2627
- [Examples](#examples)
2728
- [Reporting issues/Support](#reporting-issuessupport)
2829
- [Code of Conduct](#code-of-conduct)
@@ -267,6 +268,48 @@ Finally, all these tuple helper functions are also baked in:
267268
- `map`
268269
- `cast`
269270

271+
### Streaming
272+
273+
A popular Spark extension is [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html).
274+
Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs.
275+
There are examples for use with a checkpoint, Kafka and SQL in the [examples module](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming).
276+
277+
We shall also provide a quick example below:
278+
```kotlin
279+
// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
280+
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession
281+
282+
// create input stream for, for instance, Netcat: `$ nc -lk 9999`
283+
val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
284+
285+
// split input stream on space
286+
val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }
287+
288+
// perform action on each formed RDD in the stream
289+
words.foreachRDD { rdd: JavaRDD<String>, _: Time ->
290+
291+
// to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
292+
withSpark(rdd) { // this: KSparkSession
293+
val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
294+
dataframe
295+
.groupByKey { it.word }
296+
.count()
297+
.show()
298+
// +-----+--------+
299+
// | key|count(1)|
300+
// +-----+--------+
301+
// |hello| 1|
302+
// | is| 1|
303+
// | a| 1|
304+
// | this| 1|
305+
// | test| 3|
306+
// +-----+--------+
307+
}
308+
}
309+
}
310+
```
311+
312+
270313
## Examples
271314

272315
For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/Streaming.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
*/
2020
package org.jetbrains.kotlinx.spark.examples.streaming
2121

22+
import org.apache.spark.api.java.JavaRDD
2223
import org.apache.spark.sql.Dataset
2324
import org.apache.spark.streaming.Durations
25+
import org.apache.spark.streaming.Time
26+
import org.apache.spark.streaming.api.java.JavaDStream
27+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
2428
import org.jetbrains.kotlinx.spark.api.*
2529

2630
data class TestRow(
@@ -32,22 +36,18 @@ data class TestRow(
3236
*
3337
* `$ nc -lk 9999`
3438
*/
35-
fun main() = withSparkStreaming(Durations.seconds(1), timeout = 10_000) {
39+
fun main() = withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession
3640

37-
val lines = ssc.socketTextStream("localhost", 9999)
38-
val words = lines.flatMap { it.split(" ").iterator() }
39-
40-
words.foreachRDD { rdd, _ ->
41-
withSpark(rdd) {
41+
val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
42+
val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }
4243

44+
words.foreachRDD { rdd: JavaRDD<String>, _: Time ->
45+
withSpark(rdd) { // this: KSparkSession
4346
val dataframe: Dataset<TestRow> = rdd.map { TestRow(it) }.toDS()
44-
4547
dataframe
4648
.groupByKey { it.word }
4749
.count()
4850
.show()
4951
}
50-
5152
}
52-
5353
}

0 commit comments

Comments
 (0)