diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index e403285369..44f7601e36 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -523,7 +523,7 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..d628f44e4ee 100644 +index 2796b1cf154..53dcfde932e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} @@ -534,17 +534,46 @@ index 2796b1cf154..d628f44e4ee 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest - } +@@ -516,21 +517,24 @@ class FileBasedDataSourceSuite extends QueryTest + checkAnswer(sql(s"select A from $tableName"), data.select("A")) + + // RuntimeException is triggered at executor side, which is then wrapped as +- // SparkException at driver side +- val e1 = intercept[SparkException] { +- sql(s"select b from $tableName").collect() ++ // SparkException at driver side. Comet native readers throw RuntimeException ++ // directly without the SparkException wrapper. ++ def getDuplicateFieldError(query: String): RuntimeException = { ++ try { ++ sql(query).collect() ++ fail("Expected an exception").asInstanceOf[RuntimeException] ++ } catch { ++ case e: SparkException => ++ e.getCause.asInstanceOf[RuntimeException] ++ case e: RuntimeException => e ++ } + } +- assert( +- e1.getCause.isInstanceOf[RuntimeException] && +- e1.getCause.getMessage.contains( +- """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) +- val e2 = intercept[SparkException] { +- sql(s"select B from $tableName").collect() +- } +- assert( +- e2.getCause.isInstanceOf[RuntimeException] && +- e2.getCause.getMessage.contains( +- """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) ++ val e1 = getDuplicateFieldError(s"select b from $tableName") ++ assert(e1.getMessage.contains( ++ """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) ++ val e2 = getDuplicateFieldError(s"select B from $tableName") ++ assert(e2.getMessage.contains( ++ """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + } - Seq("parquet", "orc").foreach { format => -- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { -+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { - withTempDir { dir => - val tableName = s"spark_25132_${format}_native" - val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -815,6 +819,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -552,7 +581,7 @@ index 2796b1cf154..d628f44e4ee 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +880,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -560,7 +589,7 @@ index 2796b1cf154..d628f44e4ee 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +922,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -568,7 +597,7 @@ index 2796b1cf154..d628f44e4ee 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1107,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -2003,7 +2032,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..d865077684f 100644 +index 104b4e416cd..b8af360fa14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2083,17 +2112,32 @@ index 104b4e416cd..d865077684f 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } +@@ -1950,11 +1966,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + """.stripMargin) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { +- val e = intercept[SparkException] { ++ // Spark native readers wrap the error in SparkException. ++ // Comet native readers throw RuntimeException directly. ++ val msg = try { + sql(s"select a from $tableName where b > 0").collect() ++ fail("Expected an exception") ++ } catch { ++ case e: SparkException => ++ assert(e.getCause.isInstanceOf[RuntimeException]) ++ e.getCause.getMessage ++ case e: RuntimeException => ++ e.getMessage + } +- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( +- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) ++ assert(msg.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""), ++ s"Unexpected error message: $msg") + } -- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { -+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { - withTempPath { dir => - val count = 10 - val tableName = "spark_25207" -@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -1985,7 +2011,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2103,7 +2147,7 @@ index 104b4e416cd..d865077684f 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2045,7 +2072,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2113,7 +2157,7 @@ index 104b4e416cd..d865077684f 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2277,7 +2305,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2126,7 +2170,7 @@ index 104b4e416cd..d865077684f 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2337,7 +2369,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 227ac20f0b..c879ac8f2d 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -494,7 +494,7 @@ index a206e97c353..fea1149b67d 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..601cb6647fe 100644 +index 93275487f29..77a27d1c40a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption} @@ -522,17 +522,40 @@ index 93275487f29..601cb6647fe 100644 checkErrorMatchPVals( exception = intercept[SparkException] { testIgnoreMissingFiles(options) -@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest - } - - Seq("parquet", "orc").foreach { format => -- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { -+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { - withTempDir { dir => - val tableName = s"spark_25132_${format}_native" - val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -656,18 +660,25 @@ class FileBasedDataSourceSuite extends QueryTest + checkAnswer(sql(s"select A from $tableName"), data.select("A")) + + // RuntimeException is triggered at executor side, which is then wrapped as +- // SparkException at driver side ++ // SparkException at driver side. Comet native readers throw ++ // SparkRuntimeException directly without the SparkException wrapper. ++ def getDuplicateFieldError(query: String): SparkRuntimeException = { ++ try { ++ sql(query).collect() ++ fail("Expected an exception").asInstanceOf[SparkRuntimeException] ++ } catch { ++ case e: SparkException => ++ e.getCause.asInstanceOf[SparkRuntimeException] ++ case e: SparkRuntimeException => e ++ } ++ } + checkError( +- exception = intercept[SparkException] { +- sql(s"select b from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], ++ exception = getDuplicateFieldError(s"select b from $tableName"), + errorClass = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) + checkError( +- exception = intercept[SparkException] { +- sql(s"select B from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], ++ exception = getDuplicateFieldError(s"select B from $tableName"), + errorClass = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) +@@ -955,6 +966,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -540,7 +563,7 @@ index 93275487f29..601cb6647fe 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1027,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -548,7 +571,7 @@ index 93275487f29..601cb6647fe 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1069,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -556,7 +579,7 @@ index 93275487f29..601cb6647fe 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1254,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1959,7 +1982,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..f19c12c98e6 100644 +index 8e88049f51e..f9d515edee1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2030,17 +2053,25 @@ index 8e88049f51e..f19c12c98e6 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1952,8 +1966,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - val e = intercept[SparkException] { +@@ -1949,11 +1965,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + """.stripMargin) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { +- val e = intercept[SparkException] { ++ // Spark native readers wrap the error in SparkException(FAILED_READ_FILE). ++ // Comet native readers throw SparkRuntimeException directly. ++ val msg = try { sql(s"select a from $tableName where b > 0").collect() ++ fail("Expected an exception") ++ } catch { ++ case e: SparkException => ++ assert(e.getCause.isInstanceOf[RuntimeException]) ++ e.getCause.getMessage ++ case e: RuntimeException => ++ e.getMessage } - assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( - """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) -+ assert(e.getCause.isInstanceOf[RuntimeException]) -+ val msg = e.getCause.getMessage -+ // native_datafusion converts DataFusion's "Unable to get field named" error -+ // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b" vs "B") -+ // because DataFusion resolves field names case-insensitively + assert( + msg.contains( + """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || @@ -2050,7 +2081,7 @@ index 8e88049f51e..f19c12c98e6 100644 } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { -@@ -1984,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2013,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2060,7 +2091,7 @@ index 8e88049f51e..f19c12c98e6 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2074,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2070,7 +2101,7 @@ index 8e88049f51e..f19c12c98e6 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2301,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2307,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2083,7 +2114,7 @@ index 8e88049f51e..f19c12c98e6 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2365,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2371,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 364c62a990..4775c3a2f6 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -723,7 +723,7 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..5c4e370dfff 100644 +index 9c529d14221..2c3ee31723f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -757,17 +757,40 @@ index 9c529d14221..5c4e370dfff 100644 } Seq("json", "orc").foreach { format => -@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest - } - - Seq("parquet", "orc").foreach { format => -- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { -+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { - withTempDir { dir => - val tableName = s"spark_25132_${format}_native" - val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -668,18 +674,25 @@ class FileBasedDataSourceSuite extends QueryTest + checkAnswer(sql(s"select A from $tableName"), data.select("A")) + + // RuntimeException is triggered at executor side, which is then wrapped as +- // SparkException at driver side ++ // SparkException at driver side. Comet native readers throw ++ // SparkRuntimeException directly without the SparkException wrapper. ++ def getDuplicateFieldError(query: String): SparkRuntimeException = { ++ try { ++ sql(query).collect() ++ fail("Expected an exception").asInstanceOf[SparkRuntimeException] ++ } catch { ++ case e: SparkException => ++ e.getCause.asInstanceOf[SparkRuntimeException] ++ case e: SparkRuntimeException => e ++ } ++ } + checkError( +- exception = intercept[SparkException] { +- sql(s"select b from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], ++ exception = getDuplicateFieldError(s"select b from $tableName"), + condition = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) + checkError( +- exception = intercept[SparkException] { +- sql(s"select B from $tableName").collect() +- }.getCause.asInstanceOf[SparkRuntimeException], ++ exception = getDuplicateFieldError(s"select B from $tableName"), + condition = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) +@@ -967,6 +980,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -775,7 +798,7 @@ index 9c529d14221..5c4e370dfff 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1041,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -783,7 +806,7 @@ index 9c529d14221..5c4e370dfff 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1083,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -791,7 +814,7 @@ index 9c529d14221..5c4e370dfff 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1241,7 +1257,8 @@ class FileBasedDataSourceSuite extends QueryTest } } @@ -801,7 +824,7 @@ index 9c529d14221..5c4e370dfff 100644 withTempPath { path => val pathStr = path.getCanonicalPath spark.range(10).write.parquet(pathStr) -@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1252,6 +1269,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -2742,14 +2765,14 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..cef477c8b4d 100644 +index 6080a5e8e4b..ea058d57b4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} import org.apache.spark.sql._ -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} ++import org.apache.spark.sql.IgnoreCometNativeScan import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints @@ -2821,17 +2844,34 @@ index 6080a5e8e4b..cef477c8b4d 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } +@@ -1956,13 +1971,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + """.stripMargin) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { +- val ex = intercept[SparkException] { ++ // Spark native readers wrap the error in SparkException(FAILED_READ_FILE). ++ // Comet native readers throw SparkRuntimeException directly. ++ try { + sql(s"select a from $tableName where b > 0").collect() ++ fail("Expected an exception") ++ } catch { ++ case ex: SparkException => ++ assert(ex.getCondition.startsWith("FAILED_READ_FILE")) ++ assert(ex.getCause.isInstanceOf[SparkRuntimeException]) ++ assert(ex.getCause.getMessage.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) ++ case ex: SparkRuntimeException => ++ assert(ex.getMessage.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) + } +- assert(ex.getCondition.startsWith("FAILED_READ_FILE")) +- assert(ex.getCause.isInstanceOf[SparkRuntimeException]) +- assert(ex.getCause.getMessage.contains( +- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) + } -- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { -+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { - withTempPath { dir => - val count = 10 - val tableName = "spark_25207" -@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -1993,7 +2016,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2841,7 +2881,7 @@ index 6080a5e8e4b..cef477c8b4d 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2053,7 +2077,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2851,7 +2891,7 @@ index 6080a5e8e4b..cef477c8b4d 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2305,7 +2330,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2864,7 +2904,7 @@ index 6080a5e8e4b..cef477c8b4d 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2368,7 +2397,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 833eda75e1..81bb5537c1 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -62,10 +62,8 @@ cause Comet to fall back to Spark. - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` -- No support for duplicate field names in case-insensitive mode. When the required or data schema contains - field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Duplicates - in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time; - in that case DataFusion will throw a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, +- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns) + are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 535ed70627..9f51fd42f6 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -19,11 +19,12 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion::common::Result as DataFusionResult; +use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use datafusion::scalar::ScalarValue; +use datafusion_comet_common::SparkError; use datafusion_comet_spark_expr::{Cast, SparkCastOptions}; use datafusion_physical_expr_adapter::{ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, @@ -93,6 +94,23 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Check if a specific column name has duplicate matches in the physical schema +/// (case-insensitive). Returns the error info if so. +fn check_column_duplicate(col_name: &str, physical_schema: &SchemaRef) -> Option<(String, String)> { + let matches: Vec<&str> = physical_schema + .fields() + .iter() + .filter(|pf| pf.name().eq_ignore_ascii_case(col_name)) + .map(|pf| pf.name().as_str()) + .collect(); + if matches.len() > 1 { + // Include brackets to match the format expected by ShimSparkErrorConverter + Some((col_name.to_string(), format!("[{}]", matches.join(", ")))) + } else { + None + } +} + impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { fn create( &self, @@ -109,7 +127,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { // to the original physical names. This is necessary because downstream code // (reassign_expr_columns) looks up columns by name in the actual stream // schema, which uses the original physical file column names. - let (adapted_physical_schema, logical_to_physical_names) = + let (adapted_physical_schema, logical_to_physical_names, original_physical_schema) = if !self.parquet_options.case_sensitive { let logical_to_physical: HashMap = logical_file_schema .fields() @@ -134,9 +152,11 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { } else { Some(logical_to_physical) }, + // Keep original physical schema for per-column duplicate detection + Some(Arc::clone(&physical_file_schema)), ) } else { - (Arc::clone(&physical_file_schema), None) + (Arc::clone(&physical_file_schema), None, None) }; let default_factory = DefaultPhysicalExprAdapterFactory; @@ -152,6 +172,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { default_values: self.default_values.clone(), default_adapter, logical_to_physical_names, + original_physical_schema, }) } } @@ -181,10 +202,38 @@ struct SparkPhysicalExprAdapter { /// physical names so that downstream reassign_expr_columns can find /// columns in the actual stream schema. logical_to_physical_names: Option>, + /// The original (un-remapped) physical schema, kept for per-column duplicate + /// detection in case-insensitive mode. Only set when `!case_sensitive`. + original_physical_schema: Option, } impl PhysicalExprAdapter for SparkPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> DataFusionResult> { + // In case-insensitive mode, check if any Column in this expression references + // a field with multiple case-insensitive matches in the physical schema. + // Only the columns actually referenced trigger the error (not the whole schema). + if let Some(orig_physical) = &self.original_physical_schema { + // Walk the expression tree to find Column references + let mut duplicate_err: Option = None; + let _ = Arc::::clone(&expr).transform(|e| { + if let Some(col) = e.as_any().downcast_ref::() { + if let Some((req, matched)) = check_column_duplicate(col.name(), orig_physical) + { + duplicate_err = Some(DataFusionError::External(Box::new( + SparkError::DuplicateFieldCaseInsensitive { + required_field_name: req, + matched_fields: matched, + }, + ))); + } + } + Ok(Transformed::no(e)) + }); + if let Some(err) = duplicate_err { + return Err(err); + } + } + // First let the default adapter handle column remapping, missing columns, // and simple scalar type casts. Then replace DataFusion's CastColumnExpr // with Spark-compatible equivalents. @@ -531,4 +580,63 @@ mod test { let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; stream.next().await.unwrap() } + + #[tokio::test] + async fn parquet_duplicate_fields_case_insensitive() { + // Parquet file has columns "A", "B", "b" - reading "b" in case-insensitive mode + // should fail with duplicate field error matching Spark's _LEGACY_ERROR_TEMP_2093 + let file_schema = Arc::new(Schema::new(vec![ + Field::new("A", DataType::Int32, false), + Field::new("B", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let col_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let col_b1 = Arc::new(Int32Array::from(vec![4, 5, 6])) as Arc; + let col_b2 = Arc::new(Int32Array::from(vec![7, 8, 9])) as Arc; + let batch = + RecordBatch::try_new(Arc::clone(&file_schema), vec![col_a, col_b1, col_b2]).unwrap(); + + let filename = get_temp_filename(); + let filename = filename.as_path().as_os_str().to_str().unwrap().to_string(); + let file = File::create(&filename).unwrap(); + let mut writer = ArrowWriter::try_new(file, Arc::clone(&batch.schema()), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Read with case-insensitive mode, requesting column "b" which matches both "B" and "b" + let required_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)])); + + let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + spark_parquet_options.case_sensitive = false; + + let expr_adapter_factory: Arc = Arc::new( + SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None), + ); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![ + PartitionedFile::from_path(filename.to_string()).unwrap() + ]); + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, Arc::new(parquet_source)) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(expr_adapter_factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let result = stream.next().await.unwrap(); + + // Should fail with duplicate field error + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Found duplicate field"), + "Expected duplicate field error, got: {err_msg}" + ); + } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 863126397f..7d23de1671 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -222,22 +222,6 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None } - // Case-insensitive mode with duplicate field names produces different errors - // in DataFusion vs Spark, so fall back to avoid incompatible error messages - if (!session.sessionState.conf.caseSensitiveAnalysis) { - val schemas = Seq(scanExec.requiredSchema, r.dataSchema) - for (schema <- schemas) { - val fieldNames = - schema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT)) - if (fieldNames.length != fieldNames.distinct.length) { - withInfo( - scanExec, - "Native DataFusion scan does not support " + - "duplicate field names in case-insensitive mode") - return None - } - } - } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index a369e183c4..cf79f6af0f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -61,6 +61,42 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("native reader duplicate fields in case-insensitive mode") { + withTempPath { path => + // Write parquet with columns A, B, b (B and b are duplicates case-insensitively) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark + .range(5) + .selectExpr("id as A", "id as B", "id as b") + .write + .mode("overwrite") + .parquet(path.toString) + } + val tbl = s"dup_fields_${System.currentTimeMillis()}" + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql(s"create table $tbl (A long, B long) using parquet options (path '${path}')") + } + // In case-insensitive mode, selecting B should fail because both B and b match + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[Exception] { + sql(s"select B from $tbl").collect() + } + assert( + e.getMessage.contains("duplicate field") || + e.getMessage.contains("Found duplicate field") || + (e.getCause != null && e.getCause.getMessage.contains("duplicate field")) || + (e.getCause != null && e.getCause.getMessage.contains("Found duplicate field")), + s"Expected duplicate field error, got: ${e.getMessage}") + } + // In case-sensitive mode, selecting B should work fine + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val df = sql(s"select A from $tbl") + assert(df.collect().length == 5) + } + sql(s"drop table if exists $tbl") + } + } + test("native reader - read simple STRUCT fields") { testSingleLineQuery( """