Skip to content
Open
24 changes: 24 additions & 0 deletions native/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ pub enum SparkError {
spark_type: String,
},

/// A per-file read failure (corrupt footer/page, truncated/empty file, deleted file) raised by
/// the native parquet reader / object_store. Classified by typed `DataFusionError` variant (no
/// message matching) and translated by the JVM shim into Spark's `FAILED_READ_FILE`
/// (`QueryExecutionErrors.cannotReadFilesError`). `file_path` may be empty when the underlying
/// error doesn't carry it (only `object_store::Error::NotFound` does); the JVM side then fills
/// it from the per-task file list.
#[error("Encountered error while reading file {file_path}: {message}")]
CannotReadFile { file_path: String, message: String },

#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),

Expand Down Expand Up @@ -291,6 +300,7 @@ impl SparkError {
SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId",
SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds",
SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert",
SparkError::CannotReadFile { .. } => "CannotReadFile",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
Expand Down Expand Up @@ -528,6 +538,12 @@ impl SparkError {
"sparkType": spark_type,
})
}
SparkError::CannotReadFile { file_path, message } => {
serde_json::json!({
"filePath": file_path,
"message": message,
})
}
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
Expand Down Expand Up @@ -617,6 +633,10 @@ impl SparkError {
"org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException"
}

// CannotReadFile - converted to a FAILED_READ_FILE SparkException by the shim
// (QueryExecutionErrors.cannotReadFilesError).
SparkError::CannotReadFile { .. } => "org/apache/spark/SparkException",

// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
}
Expand Down Expand Up @@ -707,6 +727,10 @@ impl SparkError {
// SparkException error class, so no error class is exposed here.
SparkError::ParquetSchemaConvert { .. } => None,

// CannotReadFile — the JVM shim wraps it via cannotReadFilesError, which supplies the
// FAILED_READ_FILE error class, so none is exposed here.
SparkError::CannotReadFile { .. } => None,

// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
Expand Down
416 changes: 378 additions & 38 deletions native/jni-bridge/src/errors.rs

Large diffs are not rendered by default.

22 changes: 4 additions & 18 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class CometExecIterator(
partitionIndex: Int,
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
encryptedFilePaths: Seq[String] = Seq.empty,
shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty)
shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty,
taskFilePaths: Seq[String] = Seq.empty)
extends Iterator[ColumnarBatch]
with Logging {

Expand Down Expand Up @@ -169,29 +170,14 @@ class CometExecIterator(
// Handle CometQueryExecutionException with JSON payload first
case e: CometQueryExecutionException =>
logError(s"Native execution for task $taskAttemptId failed", e)
throw SparkErrorConverter.convertToSparkException(e)
throw SparkErrorConverter.convertToSparkException(e, taskFilePaths)

case e: CometNativeException =>
// it is generally considered bad practice to log and then rethrow an
// exception, but it really helps debugging to be able to see which task
// threw the exception, so we log the exception with taskAttemptId here
logError(s"Native execution for task $taskAttemptId failed", e)

val parquetError: scala.util.matching.Regex =
"""^Parquet error: (?:.*)$""".r
e.getMessage match {
case parquetError() =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
// _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks
// parameters and raises INTERNAL_ERROR if any are passed.
throw new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2254",
messageParameters = Map.empty,
cause = new SparkException("File is not a Parquet file.", e))
case _ =>
throw e
}
throw e
case e: Throwable =>
throw e
}
Expand Down
17 changes: 15 additions & 2 deletions spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ object SparkErrorConverter extends ShimSparkErrorConverter {
* @return
* the corresponding Spark exception, or the original exception if parsing fails
*/
def convertToSparkException(e: CometQueryExecutionException): Throwable = {
def convertToSparkException(
e: CometQueryExecutionException,
taskFilePaths: Seq[String] = Seq.empty): Throwable = {
try {
if (!e.isJsonMessage()) {
// Not JSON, return original exception
Expand All @@ -83,7 +85,18 @@ object SparkErrorConverter extends ShimSparkErrorConverter {

val json = parse(e.getMessage)
val errorJson = json.extract[ErrorJson]
val params = errorJson.params.getOrElse(Map.empty)
val rawParams = errorJson.params.getOrElse(Map.empty)
// CannotReadFile carries the offending file path natively only for the object_store NotFound
// case; for corrupt/truncated parquet the native error has no path, so fall back to the
// per-task file list threaded in from CometExecIterator.
val params =
if (errorJson.errorType == "CannotReadFile"
&& rawParams.get("filePath").forall(p => p == null || p.toString.isEmpty)
&& taskFilePaths.nonEmpty) {
rawParams + ("filePath" -> taskFilePaths.mkString(","))
} else {
rawParams
}
val errorClass =
errorJson.errorClass.map(_.trim).filter(_.nonEmpty).getOrElse(UNKNOWN_ERROR)

Expand Down
19 changes: 13 additions & 6 deletions spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.comet.serde.OperatorOuterClass
private[spark] class CometExecPartition(
override val index: Int,
val inputPartitions: Array[Partition],
val planDataByKey: Map[String, Array[Byte]])
val planDataByKey: Map[String, Array[Byte]],
val filePaths: Seq[String] = Seq.empty)
extends Partition

/**
Expand Down Expand Up @@ -66,7 +67,8 @@ private[spark] class CometExecRDD(
subqueries: Seq[ScalarSubquery],
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
encryptedFilePaths: Seq[String] = Seq.empty,
shuffleScanIndices: Set[Int] = Set.empty)
shuffleScanIndices: Set[Int] = Set.empty,
@transient perPartitionFilePaths: Array[Seq[String]] = Array.empty)
extends RDD[ColumnarBatch](sc, inputRDDs.map(rdd => new OneToOneDependency(rdd))) {

// Determine partition count: from inputs if available, otherwise from parameter
Expand All @@ -90,7 +92,9 @@ private[spark] class CometExecRDD(
(0 until numPartitions).map { idx =>
val inputParts = inputRDDs.map(_.partitions(idx)).toArray
val planData = perPartitionByKey.map { case (key, arr) => key -> arr(idx) }
new CometExecPartition(idx, inputParts, planData)
val fp =
if (perPartitionFilePaths.length > idx) perPartitionFilePaths(idx) else Seq.empty[String]
new CometExecPartition(idx, inputParts, planData, fp)
}.toArray
}

Expand Down Expand Up @@ -130,7 +134,8 @@ private[spark] class CometExecRDD(
partition.index,
broadcastedHadoopConfForEncryption,
encryptedFilePaths,
shuffleBlockIters)
shuffleBlockIters,
taskFilePaths = partition.filePaths)

// Register ScalarSubqueries so native code can look them up
subqueries.foreach(sub => CometScalarSubquery.setSubquery(it.id, sub))
Expand Down Expand Up @@ -179,7 +184,8 @@ object CometExecRDD {
subqueries: Seq[ScalarSubquery],
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
encryptedFilePaths: Seq[String] = Seq.empty,
shuffleScanIndices: Set[Int] = Set.empty): CometExecRDD = {
shuffleScanIndices: Set[Int] = Set.empty,
perPartitionFilePaths: Array[Seq[String]] = Array.empty): CometExecRDD = {
// scalastyle:on

new CometExecRDD(
Expand All @@ -194,6 +200,7 @@ object CometExecRDD {
subqueries,
broadcastedHadoopConfForEncryption,
encryptedFilePaths,
shuffleScanIndices)
shuffleScanIndices,
perPartitionFilePaths)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ case class CometNativeScanExec(
* all files for all partitions in the driver, we serialize only common metadata (once) and each
* partition's files (lazily, as tasks are scheduled).
*/
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
@transient private lazy val serializedPartitionData
: (Array[Byte], Array[Array[Byte]], Array[Seq[String]]) = {
// Outer partitionFilters (wrapper) DPP is resolved by Spark's standard
// prepare -> waitForSubqueries lifecycle, triggered explicitly via
// CometLeafExec.ensureSubqueriesResolved called from
Expand Down Expand Up @@ -225,13 +226,20 @@ case class CometNativeScanExec(
partitionNativeScan.toByteArray
}.toArray

(commonBytes, perPartitionBytes)
// File paths per partition -- threaded through CometExecRDD to CometExecIterator so a native
// CannotReadFile error that lacks a path (corrupt/truncated parquet) can be surfaced as
// FAILED_READ_FILE naming the actual file (see SparkErrorConverter.convertToSparkException).
val perPartitionPaths = filePartitions.map(_.files.map(_.filePath.toString).toSeq).toArray

(commonBytes, perPartitionBytes, perPartitionPaths)
}

def commonData: Array[Byte] = serializedPartitionData._1

def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2

def perPartitionFilePaths: Array[Seq[String]] = serializedPartitionData._3

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val nativeMetrics = CometMetricNode.fromCometPlan(this)
val serializedPlan = CometExec.serializeNativePlan(nativeOp)
Expand Down Expand Up @@ -259,7 +267,8 @@ case class CometNativeScanExec(
nativeMetrics,
Seq.empty,
broadcastedHadoopConfForEncryption,
encryptedFilePaths) {
encryptedFilePaths,
perPartitionFilePaths = perPartitionFilePaths) {
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val res = super.compute(split, context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

case "CannotReadFile" =>
// A per-file read failure of a readable-but-broken file (corrupt/truncated parquet,
// object_store, IO) classified by typed DataFusionError variant on the native side. Wrap
// in the FAILED_READ_FILE SparkException Spark itself produces when its own parquet reader
// fails. (A genuinely-missing file is "FileNotFound" above.) `filePath` is filled by
// SparkErrorConverter from the per-task file list when the native error carried none.
val message = params.get("message").map(_.toString).getOrElse("")
val filePath = params.get("filePath").map(_.toString).getOrElse("")
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

case "CannotReadFile" =>
// A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified
// by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE
// SparkException Spark itself produces when its own parquet reader fails. `filePath` is
// supplied by the native object_store NotFound error or, when empty, filled by
// SparkErrorConverter from the per-task file list.
val message = params.get("message").map(_.toString).getOrElse("")
val filePath = params.get("filePath").map(_.toString).getOrElse("")
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.fileNotExistError(path, new FileNotFoundException(s"File $path does not exist")))

case "CannotReadFile" =>
// A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified
// by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE
// SparkException Spark itself produces when its own parquet reader fails. `filePath` is
// supplied by the native object_store NotFound error or, when empty, filled by
// SparkErrorConverter from the per-task file list.
val message = params.get("message").map(_.toString).getOrElse("")
val filePath = params.get("filePath").map(_.toString).getOrElse("")
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))

case _ =>
// Unknown error type - return None to trigger fallback
None
Expand Down
41 changes: 41 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3159,4 +3159,45 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("pushed-down predicate divide-by-zero is not surfaced as FAILED_READ_FILE") {
// A throwing expression in a WHERE clause can be pushed into the parquet row filter (reachable
// by default once Scala UDF codegen dispatch routes the UDF natively). DataFusion's row filter
// returns the failure as an ArrowError wrapped by the parquet reader as ParquetError::External,
// which must NOT be misclassified as a corrupt-file read (FAILED_READ_FILE / cannotReadFiles).
// The divide-by-zero must surface as a divide-by-zero. Regression for the FAILED_READ_FILE
// classifier over-matching (see jni-bridge errors.rs parquet_external_wraps_arrow_error).
withSQLConf(
"spark.sql.ansi.enabled" -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true") {
val ident = org.apache.spark.sql.functions.udf((x: Int) => x)
spark.udf.register("comet_test_identity", ident)
withParquetTable(Seq(Tuple1(0), Tuple1(1)), "t") {
val ex = intercept[Exception] {
sql("SELECT 1 AS one FROM t WHERE 1 / comet_test_identity(_1) = 1").collect()
}
val chain = {
val sb = new StringBuilder
var c: Throwable = ex
var depth = 0
while (c != null && depth < 12) {
sb.append(c.getClass.getName).append(": ").append(String.valueOf(c.getMessage))
c = c.getCause
depth += 1
}
sb.toString
}
assert(
!chain.contains("Encountered error while reading file") &&
!chain.contains("FAILED_READ_FILE") &&
!chain.contains("cannotReadFiles"),
s"divide-by-zero must not surface as a file-read error, but got:\n$chain")
assert(
chain.contains("DivideByZero") || chain.contains("DIVIDE_BY_ZERO"),
s"expected a divide-by-zero error, but got:\n$chain")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,51 @@ package org.apache.comet
import org.scalatest.funsuite.AnyFunSuite

class SparkErrorConverterSuite extends AnyFunSuite {

test("CannotReadFile converts to a FAILED_READ_FILE SparkException naming the file") {
val ex = SparkErrorConverter
.convertErrorType(
"CannotReadFile",
"",
Map(
"filePath" -> "file:/tmp/data/part-0.parquet",
"message" -> "Parquet error: bad footer"),
Array.empty,
null)
.getOrElse(fail("Expected CannotReadFile to be converted to a Spark exception"))
// `cannotReadFilesError` IS the FAILED_READ_FILE path. Assert on the version-stable message
// ("Encountered error while reading file ...") rather than the `FAILED_READ_FILE` literal,
// which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 do not).
assert(ex.getMessage.contains("Encountered error while reading file"))
assert(ex.getMessage.contains("part-0.parquet"))
}

test("CannotReadFile with empty native path falls back to the per-task file list") {
// The native error (e.g. corrupt parquet) carries no path; convertToSparkException must fill
// it from the per-task file list threaded in from CometExecIterator.
val json =
"""{"errorType":"CannotReadFile","errorClass":"",""" +
""""params":{"filePath":"","message":"Parquet error: bad footer"}}"""
val ex = SparkErrorConverter.convertToSparkException(
new org.apache.comet.exceptions.CometQueryExecutionException(json),
taskFilePaths = Seq("file:/tmp/data/part-7.parquet"))
// Version-stable assertion (see above): only Spark 4.x renders the FAILED_READ_FILE class tag.
assert(ex.getMessage.contains("Encountered error while reading file"))
assert(ex.getMessage.contains("part-7.parquet"))
}

test("CannotReadFile prefers the native path over the per-task file list") {
// When the native error supplied a path, keep it rather than the fallback list.
val json =
"""{"errorType":"CannotReadFile","errorClass":"",""" +
""""params":{"filePath":"file:/tmp/data/native.parquet","message":"Parquet error: corrupt footer"}}"""
val ex = SparkErrorConverter.convertToSparkException(
new org.apache.comet.exceptions.CometQueryExecutionException(json),
taskFilePaths = Seq("file:/tmp/data/fallback.parquet"))
assert(ex.getMessage.contains("native.parquet"))
assert(!ex.getMessage.contains("fallback.parquet"))
}

private def castOverflowError(fromType: String, value: String): Throwable = {
SparkErrorConverter
.convertErrorType(
Expand Down
Loading