Skip to content

Commit 8605a15

Browse files
Merge pull request #19 from sgrG24/master
Upgrading Scala and Spark version.
2 parents 6f6a360 + 0dc0c3b commit 8605a15

5 files changed

Lines changed: 74 additions & 58 deletions

File tree

.travis.yml

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,34 @@
11
language: scala
22
scala:
3-
- 2.10.4
4-
- 2.11.11
5-
jdk: oraclejdk8
3+
- 2.11.11
4+
- 2.12.11
5+
jdk: openjdk8
66
sudo: false
77
cache:
88
directories:
9-
- "$HOME/.ivy2/cache"
10-
- "$HOME/.sbt"
9+
- "$HOME/.ivy2/cache"
10+
- "$HOME/.sbt"
1111
before_cache:
12-
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
13-
- find $HOME/.sbt -name "*.lock" -print -delete
12+
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
13+
- find $HOME/.sbt -name "*.lock" -print -delete
1414
env:
1515
global:
16-
- BUILD_LABEL=1.0.${TRAVIS_BUILD_NUMBER}
17-
- secure: pQPYXlJp86IIELAHf48ri2Q7oDISCgA137fT9wTiozlu+5YjwY9wUa+OEbDvKG1RBeqK5aWFP8KWRfoD1CEpTeLg8h+ElvX+JRhUuQS8GaX1Ev4i79YOxnLP1d/K74SEpqM3dWpacuZzP2TlYv9nGSuxC7U2PpxP54ShjBPeaAw3G7lES/vFPIdVhDXjNEjwpjfl/Fy9AanBf1f7souX/7GXVmdvg+NIDkp0ALryD7hfz38HYEd6B1RrLqqNgVFqrbSoh9bmPA9BXt8gjytEAaKkb3hB7X5Sl0Yw3xwaZzNEkxwZSSP/Z80vUJZpwAnr3TAc54yiVKx64N0CoH/kzv7FGX2fn5P4tFmwEYnsqZZvJXQ64fILHOzqiTKho2qSXA1DRsd6RD/DUME24PEpmZPmFCj7ujNuc6L15oFqIU4mSiog6HJFn8dXZAWHWJNgfgipbe/7jT8PwCigMtFsrSPgiKau39rZ3wL9jmhzUGqsGEWaWapuXdcyrG+Erzngh7Z5VI/c6Z7OLQ3zIpDBFXYenB6PV+gFqvH8Cafmb08h85a3Jnp4Va8ml9GJlTf1aNgaITSxzCEQO1RZZgrwS5Z+fu4N9J8hFWk/mou3r9nOazJ+wkGCcudQaBhl70gHluM09yaFRF5+7NjwcTMvRmdGkfA1tCnwsF/xVghci2I=
18-
- secure: akbuJ5hN9AWCL+pdsgxV6dUtg318SWK3TgUg9mJ/3ix7L6aNQWb670bYJHA9VCGxX4glzRdzJttu+7PVebbngPpdXTu6TAE3CabZbV0rarPqszvtRPBQmXtA/n36TO4VwHBJdCT3UaOxp2j3pocjqxHujZ1WPS5oUg/LY07HN6y/dgwC01j7CYK6Se7//QUs45KuKcyZkTCZO4qj4Mb+fyUehWoMaq0Ivlflj91CPr/QsIBw/y2zIKlQD/Z07EQx0fVwdhkcapH3iOj2dNoKO40Jx8B2/64OpHnH3w8ycA+QdTWgmrhGbJnhyA+WXVSPTyzIluSJFiseTJPuNH1fJ5sBlN27fPngiEEy9sFhRo/ia1riZwOB1iLqWWv1pquBJHHXYy/PwTWMee8trIjw6LVgTp4tBeLLqoHXJv5z6qubakRbqc8sxSVLXEV+DyxQJ0bcn3ywCTLikhABuRIJPfwK+ekrS3b5KLnw/9mbyaSa/UP2Ko7KAQtdjKeJz5MZ8yRTdO1PI804wUqJKjLQyka9FXY3FvbO8vjkGO1uK7nny19NRDwB65nMD/HywhA2NwHNnyzlpFtw3ZCIJ/qUNW0Qnp34YYBAAYHbq/nTtOklBcPDB3qYW5WA59nPqeU+1RzMZOnYHNKEl5ahtdlituKSy/RrwTh9zMH5eQsGiYA=
19-
- secure: FR3Aq3XQ4mWH8ie4Srcjf8oph7VBZF6JZ0ujHNb7DzPCWWc7X/tEf2Ae8DSpmb55PEui7/EMULmeaFXNbyIfVZ9weyCFBGEw7xZ2Ue6pF9T+bHz4KgNIPkbNAgxTrfxHqdTXcackDR/qSU40+hEOWzrgHYAegIWm7iGIIHeFnuKc3Ee7OK1iF5jN5ujwU4fK4m4N/gstdHiFAJ0Or0gmy2+i+GauwjkDB73zOIakjyV/TYwI4CJqqXEbXtc2rT4uOXGpPhgc69OmJplr9oIEn6FKzqLJmM+kmZPmuMp0cT2I8ga6Q4xtAeWj47Z+h0LbU27uqK4LkwDR5ucukRaGDB750WG3c+216m9xJCom63yn2hmUOmkiWU6lKqR8DeE0DqwiGjN/Q6Vr1z+oQ5EeSxAT4+J2xpDOhDkceyx5DpZgXOMyk7y0Y+78KdQVUouVczJFqCubMzenGR7rNzn9CHMkBpg07+14MXKqz7XQIPUB74Q8qwu44JH6rl5kGnf9bgcoJi7xVI25wse0wmvBdkGw7GQQsjq9WCX6pzf6zz40g9nnNGbAGCtgNaANX3pvGp+V5cczv2hwW9fP0/RnZd+lCyA+zT+n2qY8jCEDBogMi4+cMNHVIQ35hHkdlhHdwdFjsTJoKsuXCS6YZZD1B/GQmCPBJcHH3X+HH9pdGpU=
20-
- secure: TTmBqJ0U2DrADusK6GYCGQKytnCJUqeqS+2XuE+TRp3pECdpx/arSrGit5j3oBzr2yAiakk/6N6JtWEZU7ftYaOxG7ga1Chs9Bno5T+uL67uFWkqrwxvg4AYgTUXsO+Xel7V1YmiNrXctQeIILEMB7H8OaLc6/cis3laMxY/drwexJjmf249boqfHKJuv+kHwaqfjikGO3uCLZqQsDLgzRSVH/yNvPmgUu/DZXmF6uHcfGqQRra7/VfhzFvXqAhTriYhfUyzKgouWerlcmgscN8uS9SSF8j2W+iXvR1DXyQbbf9/OV7kp+NLVTPg1wvL/RyOiQC8cMTFkdcD0lQ5xl6BBKorFPltk2Zh0Qt7syd7MJ6AMRBjoGckya6EPfTjW3rh0Pr/UDuqrVVqrP0xjgp10LUB72oW5QSDNZPXNdXgj+x9ugYtLZJIes7+yYF7ALlm7R/0RlNNiIlA6HHdOrM3Fx8BY9MHWTtqYGNxZ3yZsYZVdCk2aOyGS5o9DVXLFWWuxDFwsjokA/zc5PstIsBbyJvBqCrfoZ8YQ6b31KcdXKsN+vF+3jJP4p8nDvvz9zeIVYWGYSYrNCpRfZtLWyIyXOSDpfJ+iEgTMlxgyWuZd6dIQ3TdgcqwQhYnF5z/Nl7GgpraIvT4ttaaJkInSm6rgEA9PZr573nIWY10p28=
16+
- BUILD_LABEL=1.0.${TRAVIS_BUILD_NUMBER}
17+
- secure: pQPYXlJp86IIELAHf48ri2Q7oDISCgA137fT9wTiozlu+5YjwY9wUa+OEbDvKG1RBeqK5aWFP8KWRfoD1CEpTeLg8h+ElvX+JRhUuQS8GaX1Ev4i79YOxnLP1d/K74SEpqM3dWpacuZzP2TlYv9nGSuxC7U2PpxP54ShjBPeaAw3G7lES/vFPIdVhDXjNEjwpjfl/Fy9AanBf1f7souX/7GXVmdvg+NIDkp0ALryD7hfz38HYEd6B1RrLqqNgVFqrbSoh9bmPA9BXt8gjytEAaKkb3hB7X5Sl0Yw3xwaZzNEkxwZSSP/Z80vUJZpwAnr3TAc54yiVKx64N0CoH/kzv7FGX2fn5P4tFmwEYnsqZZvJXQ64fILHOzqiTKho2qSXA1DRsd6RD/DUME24PEpmZPmFCj7ujNuc6L15oFqIU4mSiog6HJFn8dXZAWHWJNgfgipbe/7jT8PwCigMtFsrSPgiKau39rZ3wL9jmhzUGqsGEWaWapuXdcyrG+Erzngh7Z5VI/c6Z7OLQ3zIpDBFXYenB6PV+gFqvH8Cafmb08h85a3Jnp4Va8ml9GJlTf1aNgaITSxzCEQO1RZZgrwS5Z+fu4N9J8hFWk/mou3r9nOazJ+wkGCcudQaBhl70gHluM09yaFRF5+7NjwcTMvRmdGkfA1tCnwsF/xVghci2I=
18+
- secure: akbuJ5hN9AWCL+pdsgxV6dUtg318SWK3TgUg9mJ/3ix7L6aNQWb670bYJHA9VCGxX4glzRdzJttu+7PVebbngPpdXTu6TAE3CabZbV0rarPqszvtRPBQmXtA/n36TO4VwHBJdCT3UaOxp2j3pocjqxHujZ1WPS5oUg/LY07HN6y/dgwC01j7CYK6Se7//QUs45KuKcyZkTCZO4qj4Mb+fyUehWoMaq0Ivlflj91CPr/QsIBw/y2zIKlQD/Z07EQx0fVwdhkcapH3iOj2dNoKO40Jx8B2/64OpHnH3w8ycA+QdTWgmrhGbJnhyA+WXVSPTyzIluSJFiseTJPuNH1fJ5sBlN27fPngiEEy9sFhRo/ia1riZwOB1iLqWWv1pquBJHHXYy/PwTWMee8trIjw6LVgTp4tBeLLqoHXJv5z6qubakRbqc8sxSVLXEV+DyxQJ0bcn3ywCTLikhABuRIJPfwK+ekrS3b5KLnw/9mbyaSa/UP2Ko7KAQtdjKeJz5MZ8yRTdO1PI804wUqJKjLQyka9FXY3FvbO8vjkGO1uK7nny19NRDwB65nMD/HywhA2NwHNnyzlpFtw3ZCIJ/qUNW0Qnp34YYBAAYHbq/nTtOklBcPDB3qYW5WA59nPqeU+1RzMZOnYHNKEl5ahtdlituKSy/RrwTh9zMH5eQsGiYA=
19+
- secure: FR3Aq3XQ4mWH8ie4Srcjf8oph7VBZF6JZ0ujHNb7DzPCWWc7X/tEf2Ae8DSpmb55PEui7/EMULmeaFXNbyIfVZ9weyCFBGEw7xZ2Ue6pF9T+bHz4KgNIPkbNAgxTrfxHqdTXcackDR/qSU40+hEOWzrgHYAegIWm7iGIIHeFnuKc3Ee7OK1iF5jN5ujwU4fK4m4N/gstdHiFAJ0Or0gmy2+i+GauwjkDB73zOIakjyV/TYwI4CJqqXEbXtc2rT4uOXGpPhgc69OmJplr9oIEn6FKzqLJmM+kmZPmuMp0cT2I8ga6Q4xtAeWj47Z+h0LbU27uqK4LkwDR5ucukRaGDB750WG3c+216m9xJCom63yn2hmUOmkiWU6lKqR8DeE0DqwiGjN/Q6Vr1z+oQ5EeSxAT4+J2xpDOhDkceyx5DpZgXOMyk7y0Y+78KdQVUouVczJFqCubMzenGR7rNzn9CHMkBpg07+14MXKqz7XQIPUB74Q8qwu44JH6rl5kGnf9bgcoJi7xVI25wse0wmvBdkGw7GQQsjq9WCX6pzf6zz40g9nnNGbAGCtgNaANX3pvGp+V5cczv2hwW9fP0/RnZd+lCyA+zT+n2qY8jCEDBogMi4+cMNHVIQ35hHkdlhHdwdFjsTJoKsuXCS6YZZD1B/GQmCPBJcHH3X+HH9pdGpU=
20+
- secure: TTmBqJ0U2DrADusK6GYCGQKytnCJUqeqS+2XuE+TRp3pECdpx/arSrGit5j3oBzr2yAiakk/6N6JtWEZU7ftYaOxG7ga1Chs9Bno5T+uL67uFWkqrwxvg4AYgTUXsO+Xel7V1YmiNrXctQeIILEMB7H8OaLc6/cis3laMxY/drwexJjmf249boqfHKJuv+kHwaqfjikGO3uCLZqQsDLgzRSVH/yNvPmgUu/DZXmF6uHcfGqQRra7/VfhzFvXqAhTriYhfUyzKgouWerlcmgscN8uS9SSF8j2W+iXvR1DXyQbbf9/OV7kp+NLVTPg1wvL/RyOiQC8cMTFkdcD0lQ5xl6BBKorFPltk2Zh0Qt7syd7MJ6AMRBjoGckya6EPfTjW3rh0Pr/UDuqrVVqrP0xjgp10LUB72oW5QSDNZPXNdXgj+x9ugYtLZJIes7+yYF7ALlm7R/0RlNNiIlA6HHdOrM3Fx8BY9MHWTtqYGNxZ3yZsYZVdCk2aOyGS5o9DVXLFWWuxDFwsjokA/zc5PstIsBbyJvBqCrfoZ8YQ6b31KcdXKsN+vF+3jJP4p8nDvvz9zeIVYWGYSYrNCpRfZtLWyIyXOSDpfJ+iEgTMlxgyWuZd6dIQ3TdgcqwQhYnF5z/Nl7GgpraIvT4ttaaJkInSm6rgEA9PZr573nIWY10p28=
2121
before_deploy:
22-
- openssl aes-256-cbc -pass pass:$ENCRYPTION_PASSWORD -in secring.gpg.enc -out local.secring.gpg -d
23-
- openssl aes-256-cbc -pass pass:$ENCRYPTION_PASSWORD -in pubring.gpg.enc -out local.pubring.gpg -d
22+
- openssl aes-256-cbc -pass pass:$ENCRYPTION_PASSWORD -in secring.gpg.enc -out local.secring.gpg -d
23+
- openssl aes-256-cbc -pass pass:$ENCRYPTION_PASSWORD -in pubring.gpg.enc -out local.pubring.gpg -d
2424
deploy:
25-
- provider: script
26-
script: "./publish.sh"
27-
skip_cleanup: true
28-
on:
29-
tags: true
30-
jdk: oraclejdk8
31-
scala: 2.11.11
25+
- provider: script
26+
script: "./publish.sh"
27+
skip_cleanup: true
28+
on:
29+
tags: true
30+
jdk: openjdk8
31+
scala: 2.12.11
3232
notifications:
3333
slack:
3434
rooms:

build.sbt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ lazy val commonSettings = Seq(
88
organization := "com.indix",
99
organizationName := "Indix",
1010
organizationHomepage := Some(url("http://www.indix.com")),
11-
scalaVersion := "2.11.11",
11+
scalaVersion := "2.12.11",
1212
scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked"),
1313
javacOptions in (Compile, compile) ++= Seq("-Xlint:deprecation", "-source", "1.8"),
1414
javacOptions in (Compile, doc) ++= Seq("-source", "1.8"),
1515
resolvers ++= Seq(
16-
"Clojars" at "http://clojars.org/repo",
17-
"Concurrent Maven Repo" at "http://conjars.org/repo",
16+
"Clojars" at "https://clojars.org/repo",
17+
"Concurrent Maven Repo" at "https://conjars.org/repo",
1818
"Twttr Maven Repo" at "https://maven.twttr.com/"
1919
)
2020
)
@@ -93,7 +93,7 @@ lazy val storeUtils = (project in file("util-store")).
9393
libraryDependencies ++= Seq(
9494
"org.scalatest" %% "scalatest" % "3.0.3" % Test,
9595
"commons-io" % "commons-io" % "2.5",
96-
"com.twitter" %% "chill" % "0.8.1",
96+
"com.twitter" %% "chill" % "0.9.5",
9797
"org.rocksdb" % "rocksdbjni" % "4.11.2"
9898
)
9999
)
@@ -104,19 +104,19 @@ lazy val sparkUtils = (project in file("util-spark")).
104104
settings(publishSettings: _*).
105105
settings(
106106
name := "util-spark",
107-
crossScalaVersions := Seq("2.10.6", "2.11.11"),
107+
crossScalaVersions := Seq("2.11.11", "2.12.11"),
108108
libraryDependencies ++= Seq(
109109
"org.scalatest" %% "scalatest" % "3.0.3" % Test,
110-
"org.apache.spark" %% "spark-core" % "2.2.0",
111-
"org.apache.spark" %% "spark-sql" % "2.2.0",
112-
"com.databricks" %% "spark-avro" % "4.0.0",
110+
"org.apache.spark" %% "spark-core" % "2.4.4",
111+
"org.apache.spark" %% "spark-sql" % "2.4.4",
112+
"org.apache.spark" %% "spark-avro" % "2.4.4",
113113
"org.apache.hadoop" % "hadoop-aws" % "2.6.0" % Test,
114114
"com.indix" % "dfs-datastores" % "2.0.21" excludeAll(
115115
ExclusionRule(organization = "org.apache.hadoop"),
116116
ExclusionRule(organization = "org.eclipse.jetty")
117117
),
118-
"org.apache.parquet" % "parquet-avro" % "1.8.1",
119-
"org.bdgenomics.utils" %% "utils-misc" % "0.2.13"
118+
"org.apache.parquet" % "parquet-avro" % "1.10.1",
119+
"org.bdgenomics.utils" %% "utils-misc-spark2" % "0.2.16"
120120
)
121121
)
122122

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version = 0.13.17
1+
sbt.version = 0.13.18

util-spark/src/main/scala/com/indix/utils/spark/parquet/avro/ParquetAvroDataSource.scala

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.nio.ByteBuffer
44
import java.sql.Timestamp
55
import java.util
66

7-
import com.databricks.spark.avro.SchemaConverters
7+
import org.apache.spark.sql.avro.SchemaConverters
88
import org.apache.avro.generic.GenericData.Record
99
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
1010
import org.apache.avro.{Schema, SchemaBuilder}
@@ -23,18 +23,19 @@ import scala.reflect.ClassTag
2323

2424
class AvroFormatter extends Serializable {
2525
/**
26-
* This function constructs converter function for a given sparkSQL datatype. This is used in
27-
* writing Avro records out to disk
28-
*/
26+
* This function constructs converter function for a given sparkSQL datatype. This is used in
27+
* writing Avro records out to disk
28+
*/
2929
def createConverterToAvro(
30-
dataType: DataType,
31-
structName: String,
32-
recordNamespace: String): (Any) => Any = {
30+
dataType: DataType,
31+
structName: String,
32+
recordNamespace: String): (Any) => Any = {
3333
dataType match {
34-
case BinaryType => (item: Any) => item match {
35-
case null => null
36-
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
37-
}
34+
case BinaryType => (item: Any) =>
35+
item match {
36+
case null => null
37+
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
38+
}
3839
case ByteType | ShortType | IntegerType | LongType |
3940
FloatType | DoubleType | StringType | BooleanType => identity
4041
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
@@ -71,9 +72,8 @@ class AvroFormatter extends Serializable {
7172
}
7273
}
7374
case structType: StructType =>
74-
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
75-
val schema: Schema = SchemaConverters.convertStructToAvro(
76-
structType, builder, recordNamespace)
75+
val schema: Schema = SchemaConverters.toAvroType(
76+
structType,nullable = false, structName, recordNamespace)
7777
val fieldConverters = structType.fields.map(field =>
7878
createConverterToAvro(field.dataType, field.name, recordNamespace))
7979
(item: Any) => {
@@ -115,7 +115,7 @@ trait ParquetAvroDataSource {
115115
AvroParquetOutputFormat.setSchema(job, schema)
116116
ParquetOutputFormat.setCompression(job, compression)
117117
ParquetOutputFormat.setPageSize(job, 32 * 1024 * 1024) // 128 MB seems way too large
118-
val outputFormatter = if(useDFOC) classOf[ParquetAvroOutputFormatWithDFOC] else classOf[AvroParquetOutputFormat[GenericRecord]]
118+
val outputFormatter = if (useDFOC) classOf[ParquetAvroOutputFormatWithDFOC] else classOf[AvroParquetOutputFormat[GenericRecord]]
119119
rdd.map(r => null.asInstanceOf[Void] -> r)
120120
.saveAsNewAPIHadoopFile(outputLocation, classOf[Void], classOf[IndexedRecord], outputFormatter, ContextUtil.getConfiguration(job))
121121
}
@@ -137,8 +137,7 @@ trait ParquetAvroDataSource {
137137

138138
def toAvroSchema(schema: StructType): Schema = {
139139
val recordNamespace = ""
140-
val build = SchemaBuilder.record("topLevelRecord").namespace(recordNamespace)
141-
SchemaConverters.convertStructToAvro(schema, build, recordNamespace)
140+
SchemaConverters.toAvroType(schema,nullable = false, "topLevelRecord", recordNamespace)
142141
}
143142

144143
}

util-spark/src/test/scala/com/indix/utils/spark/parquet/ParquetAvroDataSourceSpec.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,30 @@ import java.io.File
55
import com.google.common.io.Files
66
import com.indix.utils.spark.parquet.avro.ParquetAvroDataSource
77
import org.apache.commons.io.FileUtils
8+
import org.apache.parquet.hadoop.metadata.CompressionCodecName
89
import org.apache.spark.sql.SparkSession
10+
import org.scalactic.Equality
11+
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, equal}
912
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
10-
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper}
11-
import org.apache.parquet.hadoop.metadata.CompressionCodecName
13+
import java.util.{Arrays => JArrays}
1214

13-
case class SampleAvroRecord(a: Int, b: String, c: Seq[String], d: Boolean, e: Double, f: collection.Map[String,String], g: Seq[Byte])
15+
case class SampleAvroRecord(a: Int, b: String, c: Seq[String], d: Boolean, e: Double, f: collection.Map[String, String], g: Array[Byte])
1416

1517
class ParquetAvroDataSourceSpec extends FlatSpec with BeforeAndAfterAll with ParquetAvroDataSource {
1618
private var spark: SparkSession = _
19+
implicit val sampleAvroRecordEq = new Equality[SampleAvroRecord] {
20+
override def areEqual(left: SampleAvroRecord, b: Any): Boolean = b match {
21+
case right: SampleAvroRecord =>
22+
left.a == right.a &&
23+
left.b == right.b &&
24+
Equality.default[Seq[String]].areEqual(left.c, right.c) &&
25+
left.d == right.d &&
26+
left.e == right.e &&
27+
Equality.default[collection.Map[String, String]].areEqual(left.f, right.f) &&
28+
JArrays.equals(left.g, right.g)
29+
case _ => false
30+
}
31+
}
1732

1833
override protected def beforeAll(): Unit = {
1934
super.beforeAll()
@@ -33,11 +48,11 @@ class ParquetAvroDataSourceSpec extends FlatSpec with BeforeAndAfterAll with Par
3348
val outputLocation = Files.createTempDir().getAbsolutePath + "/output"
3449

3550
val sampleRecords: Seq[SampleAvroRecord] = Seq(
36-
SampleAvroRecord(1, "1", List("a1"), true, 1.0d, Map("a1" -> "b1"), Seq("1".toByte)),
37-
SampleAvroRecord(2, "2", List("a2"), false, 2.0d, Map("a2" -> "b2"), Seq("2".toByte)),
38-
SampleAvroRecord(3, "3", List("a3"), true, 3.0d, Map("a3" -> "b3"), Seq("3".toByte)),
39-
SampleAvroRecord(4, "4", List("a4"), true, 4.0d, Map("a4" -> "b4"), Seq("4".toByte)),
40-
SampleAvroRecord(5, "5", List("a5"), false, 5.0d, Map("a5" -> "b5"), Seq("5".toByte))
51+
SampleAvroRecord(1, "1", List("a1"), true, 1.0d, Map("a1" -> "b1"), "1".getBytes),
52+
SampleAvroRecord(2, "2", List("a2"), false, 2.0d, Map("a2" -> "b2"), "2".getBytes),
53+
SampleAvroRecord(3, "3", List("a3"), true, 3.0d, Map("a3" -> "b3"), "3".getBytes),
54+
SampleAvroRecord(4, "4", List("a4"), true, 4.0d, Map("a4" -> "b4"), "4".getBytes),
55+
SampleAvroRecord(5, "5", List("a5"), false, 5.0d, Map("a5" -> "b5"), "5".getBytes)
4156
)
4257

4358
val sampleDf = spark.createDataFrame(sampleRecords)
@@ -51,7 +66,9 @@ class ParquetAvroDataSourceSpec extends FlatSpec with BeforeAndAfterAll with Par
5166
val records: Array[SampleAvroRecord] = spark.read.parquet(outputLocation).as[SampleAvroRecord].collect()
5267

5368
records.length should be(5)
54-
records.sortBy(_.a) should be (sampleRecords.sortBy(_.a))
69+
// We use === to use the custom Equality defined above for comparing Array[Byte]
70+
// Ref - https://github.com/scalatest/scalatest/issues/491
71+
records.sortBy(_.a) === sampleRecords.sortBy(_.a)
5572

5673
FileUtils.deleteDirectory(new File(outputLocation))
5774
}

0 commit comments

Comments
 (0)