Skip to content

Commit 1488cfc

Browse files
committed
Adds joins and cached operations
1 parent 845fe5c commit 1488cfc

File tree

8 files changed

+124
-19
lines changed

8 files changed

+124
-19
lines changed

.gradletasknamecache

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,20 @@ artifactoryPublish
9292
core:artifactoryPublish
9393
examples:artifactoryPublish
9494
kotlin-api:artifactoryPublish
95+
publish
96+
core:publish
97+
examples:publish
98+
kotlin-api:publish
99+
publishToMavenLocal
100+
core:publishToMavenLocal
101+
examples:publishToMavenLocal
102+
kotlin-api:publishToMavenLocal
95103
examples:knows
96104
examples:shadowJar
105+
uploadArchives
106+
core:uploadArchives
107+
examples:uploadArchives
108+
kotlin-api:uploadArchives
97109
check
98110
core:check
99111
examples:check
@@ -119,6 +131,9 @@ examples:extractModuleInfo
119131
kotlin-api:extractModuleInfo
120132
examples:inspectClassesForKotlinIC
121133
kotlin-api:inspectClassesForKotlinIC
134+
core:install
135+
examples:install
136+
kotlin-api:install
122137
examples:kotlinSourcesJar
123138
kotlin-api:kotlinSourcesJar
124139
examples:mainClasses
@@ -133,6 +148,8 @@ kotlin-api:processResources
133148
core:processTestResources
134149
examples:processTestResources
135150
kotlin-api:processTestResources
151+
examples:sourceJar
152+
kotlin-api:sourceJar
136153
examples:startScripts
137154
Pattern:
138155
Pattern:
@@ -231,8 +248,20 @@ artifactoryPublish
231248
artifactoryPublish
232249
artifactoryPublish
233250
artifactoryPublish
251+
publish
252+
publish
253+
publish
254+
publish
255+
publishToMavenLocal
256+
publishToMavenLocal
257+
publishToMavenLocal
258+
publishToMavenLocal
234259
knows
235260
shadowJar
261+
uploadArchives
262+
uploadArchives
263+
uploadArchives
264+
uploadArchives
236265
check
237266
check
238267
check
@@ -258,6 +287,9 @@ extractModuleInfo
258287
extractModuleInfo
259288
inspectClassesForKotlinIC
260289
inspectClassesForKotlinIC
290+
install
291+
install
292+
install
261293
kotlinSourcesJar
262294
kotlinSourcesJar
263295
mainClasses
@@ -272,4 +304,6 @@ processResources
272304
processTestResources
273305
processTestResources
274306
processTestResources
307+
sourceJar
308+
sourceJar
275309
startScripts

build.gradle

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
buildscript {
2-
repositories {
3-
maven {
4-
url 'https://repo.labs.intellij.net/central-proxy'
5-
}
6-
}
7-
dependencies {
8-
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.15.2"
9-
}
10-
}
111
plugins {
122
id 'org.jetbrains.kotlin.jvm' version '1.3.72' apply false
3+
id 'maven-publish'
134
id 'maven'
5+
id "com.jfrog.artifactory" version "4.15.2"
146
}
157

168
allprojects {
179
group = 'org.jetbrains.kotlin.spark'
18-
version = '0.0.1'
10+
version = '0.0.2'
1911
apply plugin: "com.jfrog.artifactory"
12+
apply plugin: "maven-publish"
13+
apply plugin: "maven"
2014
}
2115

2216
subprojects {
@@ -36,6 +30,9 @@ artifactory {
3630
repoKey = 'big-data-ide'
3731
maven = true
3832
}
33+
defaults {
34+
publishConfigs('archives', 'published')
35+
}
3936
}
4037
resolve {
4138
repository {

examples/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ compileTestKotlin {
2525
}
2626
}
2727

28+
task sourceJar(type: Jar) {
29+
from sourceSets.main.allJava
30+
}
31+
32+
artifactoryPublish.dependsOn sourceJar
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.jetbrains.spark.api.examples
2+
3+
import org.jetbrains.spark.api.*
4+
5+
fun main() {
6+
withSpark {
7+
dsOf(1, 2, 3, 4, 5)
8+
.map { it to (it + 2) }
9+
.withCached {
10+
showDS()
11+
12+
filter { it.first % 2 == 0 }.showDS()
13+
}
14+
.map { c(it.first, it.second, (it.first + it.second) * 2) }
15+
.show()
16+
}
17+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.jetbrains.spark.api.examples
2+
3+
import org.apache.spark.sql.Dataset
4+
import org.jetbrains.spark.api.*
5+
6+
const val MEANINGFUL_WORD_LENGTH = 4
7+
8+
fun main() {
9+
withSpark {
10+
spark
11+
.read()
12+
.textFile("/home/finkel/voina-i-mir.txt")
13+
.map { it.split(Regex("\\s")) }
14+
.flatten()
15+
.clear()
16+
.groupByKey { it }
17+
.mapGroups { k, iter -> k to iter.asSequence().count() }
18+
.sort { arrayOf(it.col("second").desc()) }
19+
.limit(20)
20+
.map { it.second to it.first }
21+
.show(false)
22+
23+
}
24+
}
25+
26+
fun Dataset<String>.clear() =
27+
filter { it.isNotBlank() }
28+
.map { it.trim(',', ' ', '\n', ':', '.', ';', '?', '!', '"', '\'', '\t', ' ') }
29+
.filter { it.length >= MEANINGFUL_WORD_LENGTH }

kotlin-api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,8 @@ test {
3030
useJUnitPlatform()
3131
}
3232

33+
task sourceJar(type: Jar) {
34+
from sourceSets.main.allJava
35+
}
36+
37+
artifactoryPublish.dependsOn sourceJar

kotlin-api/src/main/kotlin/org/jetbrains/spark/api/ApiV1.kt

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1+
@file:Suppress("HasPlatformType", "unused")
2+
13
package org.jetbrains.spark.api
24

3-
import org.apache.spark.api.java.function.FlatMapFunction
4-
import org.apache.spark.api.java.function.ForeachFunction
5-
import org.apache.spark.api.java.function.MapFunction
6-
import org.apache.spark.api.java.function.MapGroupsFunction
7-
import org.apache.spark.api.java.function.ReduceFunction
5+
import org.apache.spark.api.java.function.*
86
import org.apache.spark.sql.*
97
import org.apache.spark.sql.Encoders.*
108
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
119
import org.apache.spark.sql.types.*
12-
import org.jetbrains.spark.extensions.KSparkExtensions
1310
import scala.reflect.ClassTag
1411
import java.math.BigDecimal
1512
import java.sql.Date
@@ -96,7 +93,7 @@ fun <KEY, VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduceGroups(func: (VALUE, V
9693
reduceGroups(ReduceFunction(func))
9794
.map { t -> t._1 to t._2 }
9895

99-
inline fun <reified R> Dataset<Row>.upcast(): Dataset<R> = `as`(genericRefEncoder<R>())
96+
inline fun <T, reified R> Dataset<T>.downcast(): Dataset<R> = `as`(genericRefEncoder<R>())
10097

10198
inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))
10299

@@ -112,13 +109,34 @@ fun <T, R> Dataset<T>.cached(func: (Dataset<T>) -> R): R {
112109
fun Column.eq(c: Column) = this.`$eq$eq$eq`(c)
113110

114111
infix fun Column.`==`(c: Column) = `$eq$eq$eq`(c)
112+
infix fun Column.`&&`(c: Column) = and(c)
113+
fun lit(a: Any) = functions.lit(a)
115114

116115
inline fun <reified L, reified R : Any?> Dataset<L>.leftJoin(right: Dataset<R>, col: Column): Dataset<Pair<L, R?>> {
117116
return joinWith(right, col, "left").map { it._1 to it._2 }
118117
}
119118

119+
inline fun <reified L : Any?, reified R> Dataset<L>.rightJoin(right: Dataset<R>, col: Column): Dataset<Pair<L?, R>> {
120+
return joinWith(right, col, "right").map { it._1 to it._2 }
121+
}
122+
123+
inline fun <reified L, reified R> Dataset<L>.innerJoin(right: Dataset<R>, col: Column): Dataset<Pair<L, R>> {
124+
return joinWith(right, col, "inner").map { it._1 to it._2 }
125+
}
126+
127+
inline fun <reified L : Any?, reified R : Any?> Dataset<L>.fullJoin(right: Dataset<R>, col: Column): Dataset<Pair<L?, R?>> {
128+
return joinWith(right, col, "full").map { it._1 to it._2 }
129+
}
130+
120131
inline fun <reified T> Dataset<T>.sort(columns: (Dataset<T>) -> Array<Column>) = sort(*columns(this))
121132

133+
inline fun <reified T, R> Dataset<T>.withCached(blockingUnpersist: Boolean = false, executeOnCached: Dataset<T>.() -> R): R {
134+
val cached = this.cache()
135+
return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) }
136+
}
137+
138+
fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) }
139+
122140
fun schema(type: KType, map: Map<String, KType> = mapOf()): DataType {
123141
val primitiveSchema = knownDataTypes[type.classifier]
124142
if (primitiveSchema != null) return KSimpleTypeWrapper(primitiveSchema, (type.classifier!! as KClass<*>).java, type.isMarkedNullable)

kotlin-api/src/main/kotlin/org/jetbrains/spark/api/SparkHelper.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ inline fun withSpark(props: Map<String, Any> = emptyMap(), master: String = "loc
2323
.also { it.stop() }
2424
}
2525

26-
@Suppress("EXPERIMENTAL_FEATURE_WARNING")
26+
@Suppress("EXPERIMENTAL_FEATURE_WARNING", "unused")
2727
inline class KSparkSession(val spark: SparkSession) {
2828
inline fun <reified T> List<T>.toDS() = toDS(spark)
2929
inline fun <reified T> dsOf(vararg arg: T) = spark.dsOf(*arg)

0 commit comments

Comments
 (0)