Skip to content

Commit 5108508

Browse files
committed
Added Jupyter streaming example
1 parent f78a52f commit 5108508

File tree

1 file changed

+191
-0
lines changed

1 file changed

+191
-0
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"source": [
6+
"By default the latest version of the API and the latest supported Spark version is chosen. To specify your own: %use spark-streaming(spark=3.2, v=1.1.0)"
7+
],
8+
"metadata": {
9+
"collapsed": false,
10+
"pycharm": {
11+
"name": "#%% md\n"
12+
}
13+
}
14+
},
15+
{
16+
"cell_type": "code",
17+
"execution_count": 2,
18+
"outputs": [
19+
{
20+
"name": "stdout",
21+
"output_type": "stream",
22+
"text": [
23+
"To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.\n"
24+
]
25+
}
26+
],
27+
"source": [
28+
"%use spark-streaming"
29+
],
30+
"metadata": {
31+
"collapsed": false,
32+
"pycharm": {
33+
"name": "#%%\n"
34+
}
35+
}
36+
},
37+
{
38+
"cell_type": "markdown",
39+
"source": [
40+
"Let's define some data class to work with."
41+
],
42+
"metadata": {
43+
"collapsed": false,
44+
"pycharm": {
45+
"name": "#%% md\n"
46+
}
47+
}
48+
},
49+
{
50+
"cell_type": "code",
51+
"execution_count": 4,
52+
"outputs": [],
53+
"source": [
54+
"data class TestRow(\n",
55+
" val word: String,\n",
56+
")"
57+
],
58+
"metadata": {
59+
"collapsed": false,
60+
"pycharm": {
61+
"name": "#%%\n"
62+
}
63+
}
64+
},
65+
{
66+
"cell_type": "markdown",
67+
"source": [
68+
"To run this on your local machine, you need to first run a Netcat server: `$ nc -lk 9999`.\n",
69+
"\n",
70+
"This example will collect the data from this stream for 10 seconds and 1 second intervals, splitting and counting the input per word."
71+
],
72+
"metadata": {
73+
"collapsed": false,
74+
"pycharm": {
75+
"name": "#%% md\n"
76+
}
77+
}
78+
},
79+
{
80+
"cell_type": "code",
81+
"execution_count": 5,
82+
"outputs": [
83+
{
84+
"name": "stdout",
85+
"output_type": "stream",
86+
"text": [
87+
"+---+--------+\n",
88+
"|key|count(1)|\n",
89+
"+---+--------+\n",
90+
"+---+--------+\n",
91+
"\n",
92+
"+-----+--------+\n",
93+
"| key|count(1)|\n",
94+
"+-----+--------+\n",
95+
"|hello| 8|\n",
96+
"|Hello| 6|\n",
97+
"|world| 3|\n",
98+
"| | 2|\n",
99+
"| test| 4|\n",
100+
"+-----+--------+\n",
101+
"\n",
102+
"+-----+--------+\n",
103+
"| key|count(1)|\n",
104+
"+-----+--------+\n",
105+
"|hello| 3|\n",
106+
"+-----+--------+\n",
107+
"\n",
108+
"+---+--------+\n",
109+
"|key|count(1)|\n",
110+
"+---+--------+\n",
111+
"+---+--------+\n",
112+
"\n",
113+
"+---+--------+\n",
114+
"|key|count(1)|\n",
115+
"+---+--------+\n",
116+
"+---+--------+\n",
117+
"\n",
118+
"+---+--------+\n",
119+
"|key|count(1)|\n",
120+
"+---+--------+\n",
121+
"+---+--------+\n",
122+
"\n",
123+
"+---+--------+\n",
124+
"|key|count(1)|\n",
125+
"+---+--------+\n",
126+
"+---+--------+\n",
127+
"\n",
128+
"+---+--------+\n",
129+
"|key|count(1)|\n",
130+
"+---+--------+\n",
131+
"+---+--------+\n",
132+
"\n",
133+
"+-----+--------+\n",
134+
"| key|count(1)|\n",
135+
"+-----+--------+\n",
136+
"|hello| 1|\n",
137+
"|world| 2|\n",
138+
"+-----+--------+\n",
139+
"\n",
140+
"+---+--------+\n",
141+
"|key|count(1)|\n",
142+
"+---+--------+\n",
143+
"+---+--------+\n",
144+
"\n"
145+
]
146+
}
147+
],
148+
"source": [
149+
"withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession\n",
150+
"\n",
151+
" val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream(\"localhost\", 9999)\n",
152+
" val words: JavaDStream<String> = lines.flatMap { it.split(\" \").iterator() }\n",
153+
"\n",
154+
" words.foreachRDD { rdd: JavaRDD<String>, _: Time ->\n",
155+
" withSpark(rdd) { // this: KSparkSession\n",
156+
" val dataframe: Dataset<TestRow> = rdd.map { TestRow(it) }.toDS()\n",
157+
" dataframe\n",
158+
" .groupByKey { it.word }\n",
159+
" .count()\n",
160+
" .show()\n",
161+
" }\n",
162+
" }\n",
163+
"}"
164+
],
165+
"metadata": {
166+
"collapsed": false,
167+
"pycharm": {
168+
"name": "#%%\n"
169+
}
170+
}
171+
}
172+
],
173+
"metadata": {
174+
"kernelspec": {
175+
"display_name": "Kotlin",
176+
"language": "kotlin",
177+
"name": "kotlin"
178+
},
179+
"language_info": {
180+
"name": "kotlin",
181+
"version": "1.7.0-dev-1825",
182+
"mimetype": "text/x-kotlin",
183+
"file_extension": ".kt",
184+
"pygments_lexer": "kotlin",
185+
"codemirror_mode": "text/x-kotlin",
186+
"nbconvert_exporter": ""
187+
}
188+
},
189+
"nbformat": 4,
190+
"nbformat_minor": 0
191+
}

0 commit comments

Comments
 (0)