@@ -41,63 +41,6 @@ internal class SparkStreamingIntegration : Integration() {
4141
4242 @Language(" kts" )
4343 val _1 = listOf (
44- // For when onInterrupt is implemented in the Jupyter kernel
45- // """
46- // val sscCollection = mutableSetOf<JavaStreamingContext>()
47- // """.trimIndent(),
48- // """
49- // @JvmOverloads
50- // fun withSparkStreaming(
51- // batchDuration: Duration = Durations.seconds(1L),
52- // checkpointPath: String? = null,
53- // hadoopConf: Configuration = SparkHadoopUtil.get().conf(),
54- // createOnError: Boolean = false,
55- // props: Map<String, Any> = emptyMap(),
56- // master: String = SparkConf().get("spark.master", "local[*]"),
57- // appName: String = "Kotlin Spark Sample",
58- // timeout: Long = -1L,
59- // startStreamingContext: Boolean = true,
60- // func: KSparkStreamingSession.() -> Unit,
61- // ) {
62- //
63- // // will only be set when a new context is created
64- // var kSparkStreamingSession: KSparkStreamingSession? = null
65- //
66- // val creatingFunc = {
67- // val sc = SparkConf()
68- // .setAppName(appName)
69- // .setMaster(master)
70- // .setAll(
71- // props
72- // .map { (key, value) -> key X value.toString() }
73- // .asScalaIterable()
74- // )
75- //
76- // val ssc = JavaStreamingContext(sc, batchDuration)
77- // ssc.checkpoint(checkpointPath)
78- //
79- // kSparkStreamingSession = KSparkStreamingSession(ssc)
80- // func(kSparkStreamingSession!!)
81- //
82- // ssc
83- // }
84- //
85- // val ssc = when {
86- // checkpointPath != null ->
87- // JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)
88- //
89- // else -> creatingFunc()
90- // }
91- // sscCollection += ssc
92- //
93- // if (startStreamingContext) {
94- // ssc.start()
95- // kSparkStreamingSession?.invokeRunAfterStart()
96- // }
97- // ssc.awaitTerminationOrTimeout(timeout)
98- // ssc.stop()
99- // }
100- // """.trimIndent(),
10144 """
10245 println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""" .trimIndent(),
10346 ).map(::execute)
@@ -106,22 +49,4 @@ internal class SparkStreamingIntegration : Integration() {
10649 override fun KotlinKernelHost.onShutdown () = Unit
10750
10851 override fun KotlinKernelHost.afterCellExecution (snippetInstance : Any , result : FieldValue ) = Unit
109-
110- // For when this feature is implemented in the Jupyter kernel
111- // override fun KotlinKernelHost.onInterrupt() {
112- //
113- // @Language("kts")
114- // val _1 = listOf(
115- // """
116- // while (sscCollection.isNotEmpty())
117- // sscCollection.first().let {
118- // it.stop()
119- // sscCollection.remove(it)
120- // }
121- // """.trimIndent(),
122- // """
123- // println("onInterrupt cleanup!")
124- // """.trimIndent()
125- // ).map(::execute)
126- // }
12752}
0 commit comments