From 67cf5fdcdbf726c357f8e1ce31d36f7bbffb00aa Mon Sep 17 00:00:00 2001 From: Ben Lee Rodgers Date: Wed, 10 Feb 2016 17:59:13 -0500 Subject: [PATCH 1/3] Change a few access modifiers to improve extensibility. Currently a lot of functionality is hidden behind private access modifiers making it difficult to extend the functionality of the parsers without copying large chunks of code. These small changes make re-use much easier. --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 4 ++-- src/main/scala/com/databricks/spark/csv/readers/readers.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index c2cd955..5ffb396 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -213,7 +213,7 @@ case class CsvRelation protected[spark] ( } } - private def inferSchema(): StructType = { + protected def inferSchema(): StructType = { if (this.userSchema != null) { userSchema } else { @@ -265,7 +265,7 @@ case class CsvRelation protected[spark] ( } } - private def univocityParseCSV( + protected def univocityParseCSV( file: RDD[String], header: Seq[String]): RDD[Array[String]] = { // If header is set, make sure firstLine is materialized before sending to executors. diff --git a/src/main/scala/com/databricks/spark/csv/readers/readers.scala b/src/main/scala/com/databricks/spark/csv/readers/readers.scala index 9fb1212..40b90d1 100644 --- a/src/main/scala/com/databricks/spark/csv/readers/readers.scala +++ b/src/main/scala/com/databricks/spark/csv/readers/readers.scala @@ -178,7 +178,7 @@ private[csv] class BulkCsvReader( * parsed and needs the newlines to be present * @param iter iterator over RDD[String] */ -private class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { +class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { private var next: Long = 0 private var length: Long = 0 // length of input so far From be52806dbda0553a819770a3d623e5978c146664 Mon Sep 17 00:00:00 2001 From: Ben Lee Rodgers Date: Thu, 11 Feb 2016 18:00:27 -0500 Subject: [PATCH 2/3] Further extensibility improvements. Was able to reinstate some of the access modifiers and instead extracted out extensible code from those methods into new overridable methods. Added some hierarchy to the parsers/readers so they can be switched out. --- .../databricks/spark/csv/CsvRelation.scala | 47 ++++++++++++------- .../spark/csv/readers/readers.scala | 27 ++++++++--- .../databricks/spark/csv/util/TextFile.scala | 2 +- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 5ffb396..b34e080 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.sources.{PrunedScan, BaseRelation, InsertableRelation, TableScan} import org.apache.spark.sql.types._ -import com.databricks.spark.csv.readers.{BulkCsvReader, LineCsvReader} +import com.databricks.spark.csv.readers.{BulkCsvReader, LineCsvReader, BulkReader, LineReader} import com.databricks.spark.csv.util._ case class CsvRelation protected[spark] ( @@ -213,19 +213,24 @@ case class CsvRelation protected[spark] ( } } - protected def inferSchema(): StructType = { + protected def getLineReader(): LineReader = { + val escapeVal = if (escape == null) '\\' else escape.charValue() + val commentChar: Char = if (comment == null) '\0' else comment + val quoteChar: Char = if (quote == null) '\0' else quote + + new LineCsvReader( + fieldSep = delimiter, + quote = quoteChar, + escape = escapeVal, + commentMarker = commentChar) + } + + private def inferSchema(): StructType = { if (this.userSchema != null) { userSchema } else { val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) { - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - val quoteChar: Char = if (quote == null) '\0' else quote - new LineCsvReader( - fieldSep = delimiter, - quote = quoteChar, - escape = escapeVal, - commentMarker = commentChar).parseLine(firstLine) + getLineReader().parseLine(firstLine) } else { val csvFormat = defaultCsvFormat .withDelimiter(delimiter) @@ -265,7 +270,19 @@ case class CsvRelation protected[spark] ( } } - protected def univocityParseCSV( + protected def getBulkReader( + header: Seq[String], + iter: Iterator[String], split: Int): BulkReader = { + val escapeVal = if (escape == null) '\\' else escape.charValue() + val commentChar: Char = if (comment == null) '\0' else comment + val quoteChar: Char = if (quote == null) '\0' else quote + + new BulkCsvReader(iter, split, + headers = header, fieldSep = delimiter, + quote = quoteChar, escape = escapeVal, commentMarker = commentChar) + } + + private def univocityParseCSV( file: RDD[String], header: Seq[String]): RDD[Array[String]] = { // If header is set, make sure firstLine is materialized before sending to executors. @@ -273,13 +290,7 @@ case class CsvRelation protected[spark] ( val dataLines = if (useHeader) file.filter(_ != filterLine) else file val rows = dataLines.mapPartitionsWithIndex({ case (split, iter) => { - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - val quoteChar: Char = if (quote == null) '\0' else quote - - new BulkCsvReader(iter, split, - headers = header, fieldSep = delimiter, - quote = quoteChar, escape = escapeVal, commentMarker = commentChar) + getBulkReader(header, iter, split) } }, true) diff --git a/src/main/scala/com/databricks/spark/csv/readers/readers.scala b/src/main/scala/com/databricks/spark/csv/readers/readers.scala index 40b90d1..dc4eaee 100644 --- a/src/main/scala/com/databricks/spark/csv/readers/readers.scala +++ b/src/main/scala/com/databricks/spark/csv/readers/readers.scala @@ -22,6 +22,21 @@ import java.io.StringReader import com.univocity.parsers.csv._ +/** + * Allows for greater extensibility + */ +trait BulkReader extends Iterator[Array[String]] { + protected def getReader(iter: Iterator[String]) = new StringIteratorReader(iter) +} + +/** + * Allows for greater extensibility + */ +trait LineReader { + protected def getReader(line: String) = new StringReader(line) + def parseLine(line: String): Array[String] +} + /** * Read and parse CSV-like input * @param fieldSep the delimiter used to separate fields in a line @@ -97,14 +112,15 @@ private[csv] class LineCsvReader( ignoreTrailingSpace, null, inputBufSize, - maxCols) { + maxCols) + with LineReader{ /** * parse a line * @param line a String with no newline at the end * @return array of strings where each string is a field in the CSV record */ def parseLine(line: String): Array[String] = { - parser.beginParsing(new StringReader(line)) + parser.beginParsing(getReader(line)) val parsed = parser.parseNext() parser.stopParsing() parsed @@ -148,10 +164,9 @@ private[csv] class BulkCsvReader( headers, inputBufSize, maxCols) - with Iterator[Array[String]] { + with BulkReader { - private val reader = new StringIteratorReader(iter) - parser.beginParsing(reader) + parser.beginParsing(getReader(iter)) private var nextRecord = parser.parseNext() /** @@ -178,7 +193,7 @@ private[csv] class BulkCsvReader( * parsed and needs the newlines to be present * @param iter iterator over RDD[String] */ -class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { +private[readers] class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { private var next: Long = 0 private var length: Long = 0 // length of input so far diff --git a/src/main/scala/com/databricks/spark/csv/util/TextFile.scala b/src/main/scala/com/databricks/spark/csv/util/TextFile.scala index 3b8d6c6..2dde4d3 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TextFile.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TextFile.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -private[csv] object TextFile { +object TextFile { val DEFAULT_CHARSET = Charset.forName("UTF-8") def withCharset(context: SparkContext, location: String, charset: String): RDD[String] = { From 2c6522c56f6391cbd34bb530cea2e5ed366e03ce Mon Sep 17 00:00:00 2001 From: Ben Lee Rodgers Date: Wed, 17 Feb 2016 11:16:27 -0500 Subject: [PATCH 3/3] Minor comment and naming changes --- .../com/databricks/spark/csv/readers/readers.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/readers/readers.scala b/src/main/scala/com/databricks/spark/csv/readers/readers.scala index dc4eaee..1c81815 100644 --- a/src/main/scala/com/databricks/spark/csv/readers/readers.scala +++ b/src/main/scala/com/databricks/spark/csv/readers/readers.scala @@ -22,18 +22,12 @@ import java.io.StringReader import com.univocity.parsers.csv._ -/** - * Allows for greater extensibility - */ trait BulkReader extends Iterator[Array[String]] { - protected def getReader(iter: Iterator[String]) = new StringIteratorReader(iter) + protected def reader(iter: Iterator[String]) = new StringIteratorReader(iter) } -/** - * Allows for greater extensibility - */ trait LineReader { - protected def getReader(line: String) = new StringReader(line) + protected def reader(line: String) = new StringReader(line) def parseLine(line: String): Array[String] } @@ -120,7 +114,7 @@ private[csv] class LineCsvReader( * @return array of strings where each string is a field in the CSV record */ def parseLine(line: String): Array[String] = { - parser.beginParsing(getReader(line)) + parser.beginParsing(reader(line)) val parsed = parser.parseNext() parser.stopParsing() parsed @@ -166,7 +160,7 @@ private[csv] class BulkCsvReader( maxCols) with BulkReader { - parser.beginParsing(getReader(iter)) + parser.beginParsing(reader(iter)) private var nextRecord = parser.parseNext() /**