forked from databricks/tensorframes
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDevDataSet.sc
More file actions
60 lines (50 loc) · 1.75 KB
/
DevDataSet.sc
File metadata and controls
60 lines (50 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// -*- mode: scala -*-
/**
* Generic I/O utilities for constructing paths
*/
import java.nio.file.{Paths, Path, Files}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
object IOUtils {
object Implicits {
implicit def string2fpath(s: String): FPath = new FPath(Paths.get(s))
implicit def path2fpath(p: Path): FPath = new FPath(p)
implicit def fpath2path(fp: FPath): Path = fp.fp
}
case class FPath(fp: Path) {
import Implicits._
private[this] lazy val fh = fp.toFile
def /(sub: FPath) = new FPath(fp.resolve(sub.fp))
def exists: Boolean = fh.exists
override def toString: String = fp.toString
}
object FPath {
val home = new FPath(Paths.get(System.getProperty("user.home")))
}
}
import IOUtils._, IOUtils.Implicits._
object ImageDataSrc {
val fpLocalData = FPath.home / "local" / "data" / "images"
def load(name: String)(implicit spark: SparkSession): DataFrame = {
val fpImages = fpLocalData / s"${name}.parquet"
if (! fpImages.exists) {
val fpJson = fpLocalData / s"${name}.json"
require(fpJson.exists)
val imgSchema = StructType(Seq(
StructField("data", BinaryType, false),
StructField("height", IntegerType, false),
StructField("mode", StringType, false),
StructField("nChannels", IntegerType, false),
StructField("width", IntegerType, false)
))
val schema = StructType(Seq(
StructField("image", imgSchema, false),
StructField("labels", ArrayType(StringType), false),
StructField("uri", StringType, true)
))
val df = spark.read.schema(schema).json(fpJson.toString)
df.write.mode("overwrite").parquet(fpImages.toString)
}
spark.read.parquet(fpImages.toString).cache()
}
}