From 803ae92b1e933d63571699967db86b4964afc12a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 28 Mar 2026 09:58:10 -0600 Subject: [PATCH 1/2] fix: remove unnecessary IgnoreCometNativeDataFusion tags from 3.5.8 diff Remove IgnoreCometNativeDataFusion from tests that pass with native_datafusion scan in the 3.5.8 Spark SQL test diff. Also fix ExtractPythonUDFsSuite to match CometNativeScanExec in plan checks, and update DPP test issue reference from #3313 to #3442 for consistency with other diffs. Tests that still need the tag (bucketed read/scan suites) are kept as they require helper method updates to support CometNativeScanExec. --- dev/diffs/3.5.8.diff | 245 +++++-------------------------------------- 1 file changed, 28 insertions(+), 217 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 22dee455a2..fbe7599233 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -240,20 +240,6 @@ index e5494726695..00937f025c2 100644 } test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -index 9e8d77c53f3..855e3ada7d1 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { - } - } - -- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { -+ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 6f3090d8908..c08a60fb0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -412,7 +398,7 @@ index c4fb4fa943c..a04b23870a8 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..42eb9fd1cb7 100644 +index f33432ddb6f..7122af0d414 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -469,7 +455,7 @@ index f33432ddb6f..42eb9fd1cb7 100644 test("static scan metrics", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -483,20 +469,10 @@ index f33432ddb6f..42eb9fd1cb7 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a206e97c353..79813d8e259 100644 +index a206e97c353..fea1149b67d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite - } - } - -- test("explain formatted - check presence of subquery in case of DPP") { -+ test("explain formatted - check presence of subquery in case of DPP", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { - withTable("df1", "df2") { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", - SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", -@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -506,7 +482,7 @@ index a206e97c353..79813d8e259 100644 withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt -@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite +@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } @@ -1050,20 +1026,6 @@ index 04702201f82..5ee11f83ecf 100644 } assert(exchanges.size === 1) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -index 9f8e979e3fb..3bc9dab8023 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession { - spark.catalog.dropTempView("tmp_table") - } - -- test("SPARK-8005 input_file_name") { -+ test("SPARK-8005 input_file_name", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10, 2).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index d269290e616..13726a31e07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1128,31 +1090,18 @@ index d269290e616..13726a31e07 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..b7c234e1437 100644 +index cfc8b2cc845..c4be7eb3731 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector - import scala.collection.mutable.ArrayBuffer - +@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf --import org.apache.spark.sql.{AnalysisException, QueryTest} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest} + import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { - } - } - -- test("Fallback Parquet V2 to V1") { -+ test("Fallback Parquet V2 to V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { - val commands = ArrayBuffer.empty[(String, LogicalPlan)] -@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { +@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { val df = spark.read.format(format).load(path.getCanonicalPath) checkAnswer(df, inputData.toDF()) assert( @@ -1416,28 +1365,6 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index a1147c16cc8..c7a29496328 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution - - import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.TableIdentifier - import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} - import org.apache.spark.sql.catalyst.parser.ParseException -@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { - } - } - -- test("alter temporary view should follow current storeAnalyzedPlanForView config") { -+ test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - withTable("t") { - Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") - withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index eec396b2e39..bf3f1c769d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -2032,7 +1959,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..b713ccddfcb 100644 +index 8e88049f51e..f19c12c98e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2058,17 +1985,7 @@ index 8e88049f51e..b713ccddfcb 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } - -- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") { -+ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { - import testImplicits._ - - withAllParquetReaders { -@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2081,15 +1998,7 @@ index 8e88049f51e..b713ccddfcb 100644 } } } - } - -- test("Filters should be pushed down for Parquet readers at row group level") { -+ test("Filters should be pushed down for Parquet readers at row group level", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { - import testImplicits._ - - withSQLConf( -@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2102,7 +2011,7 @@ index 8e88049f51e..b713ccddfcb 100644 } } } -@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } @@ -2111,7 +2020,7 @@ index 8e88049f51e..b713ccddfcb 100644 import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2121,7 +2030,7 @@ index 8e88049f51e..b713ccddfcb 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1952,8 +1968,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1952,8 +1966,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val e = intercept[SparkException] { sql(s"select a from $tableName where b > 0").collect() } @@ -2141,7 +2050,7 @@ index 8e88049f51e..b713ccddfcb 100644 } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { -@@ -1984,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2151,7 +2060,7 @@ index 8e88049f51e..b713ccddfcb 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2161,7 +2070,7 @@ index 8e88049f51e..b713ccddfcb 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2303,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2301,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2174,7 +2083,7 @@ index 8e88049f51e..b713ccddfcb 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2367,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2365,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2480,42 +2389,32 @@ index 5cdbdc27b32..307fba16578 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..7b81f3a8f6d 100644 +index 0ab8691801d..b18a5bea944 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -17,7 +17,9 @@ - +@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python -+import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { - assert(arrowEvalNodes.size == 2) - } - -- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { -+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) -@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -2524,13 +2423,14 @@ index 0ab8691801d..7b81f3a8f6d 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters ++ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2538,7 +2438,7 @@ index 0ab8691801d..7b81f3a8f6d 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3006,72 +2906,6 @@ index aad91601758..201083bd621 100644 }) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -index b5cf13a9c12..ac17603fb7f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar - - import org.apache.spark.{SparkException, TestUtils} - import org.apache.spark.internal.Logging --import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} -+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode} - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} - import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} -@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi - ) - } - -- test("SPARK-41198: input row calculation with CTE") { -+ test("SPARK-41198: input row calculation with CTE", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - withTable("parquet_tbl", "parquet_streaming_tbl") { - spark.range(0, 10).selectExpr("id AS col1", "id AS col2") - .write.format("parquet").saveAsTable("parquet_tbl") -@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi - } - } - -- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { -+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - withTable("parquet_streaming_tbl") { - val streamInput = MemoryStream[Int] - val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -index 8f099c31e6b..ce4b7ad25b3 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala -@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming - import org.scalatest.BeforeAndAfter - import org.scalatest.concurrent.PatienceConfiguration.Timeout - --import org.apache.spark.sql.SaveMode -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode} - import org.apache.spark.sql.connector.catalog.Identifier - import org.apache.spark.sql.execution.streaming.MemoryStream - import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog} -@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { - sqlContext.streams.active.foreach(_.stop()) - } - -- test("self-union, DSv1, read via DataStreamReader API") { -+ test("self-union, DSv1, read via DataStreamReader API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { - withTempPath { dir => - val dataLocation = dir.getAbsolutePath - spark.range(1, 4).write.format("parquet").save(dataLocation) -@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { - } - } - -- test("self-union, DSv1, read via table API") { -+ test("self-union, DSv1, read via table API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { - withTable("parquet_streaming_tbl") { - spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet") - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index abe606ad9c1..2d930b64cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -3282,29 +3116,6 @@ index de3b1ffccf0..2a76d127093 100644 override def beforeEach(): Unit = { super.beforeEach() -diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -index f3be79f9022..b4b1ea8dbc4 100644 ---- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn - import org.apache.hadoop.io.{LongWritable, Writable} - - import org.apache.spark.{SparkException, SparkFiles, TestUtils} --import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode - import org.apache.spark.sql.catalyst.plans.logical.Project - import org.apache.spark.sql.execution.WholeStageCodegenExec -@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - } - } - -- test("SPARK-11522 select input_file_name from non-parquet table") { -+ test("SPARK-11522 select input_file_name from non-parquet table", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - - withTempDir { tempDir => - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6160c3e5f6c..0956d7d9edc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala From e27eb1b0379f5aa78f7bfabb310cdf35008b758f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 28 Mar 2026 10:06:20 -0600 Subject: [PATCH 2/2] fix: update bucketed tests to support CometNativeScanExec Add CometNativeScanExec to plan node pattern matches in BucketedReadSuite and DisableUnnecessaryBucketedScanSuite helper methods, allowing all #3319 tests to pass with native_datafusion scan. --- dev/diffs/3.5.8.diff | 103 +++++++++---------------------------------- 1 file changed, 20 insertions(+), 83 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index fbe7599233..227ac20f0b 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2463,7 +2463,7 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 746f289c393..5b9e31c1fa6 100644 +index 746f289c393..e5dc13b87d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources @@ -2488,7 +2488,7 @@ index 746f289c393..5b9e31c1fa6 100644 import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -2498,6 +2498,7 @@ index 746f289c393..5b9e31c1fa6 100644 + val fileScan = collect(plan) { + case f: FileSourceScanExec => f + case f: CometScanExec => f ++ case f: CometNativeScanExec => f + } assert(fileScan.nonEmpty, plan) fileScan.head @@ -2506,12 +2507,13 @@ index 746f289c393..5b9e31c1fa6 100644 + private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match { + case fs: FileSourceScanExec => fs.bucketedScan + case bs: CometScanExec => bs.bucketedScan ++ case ns: CometNativeScanExec => ns.bucketedScan + } + // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -2521,7 +2523,7 @@ index 746f289c393..5b9e31c1fa6 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -2584,14 +2586,7 @@ index 746f289c393..5b9e31c1fa6 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti - } - } - -- test("disable bucketing when the output doesn't contain all bucketing columns") { -+ test("disable bucketing when the output doesn't contain all bucketing columns", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("bucketed_table") { +@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -2605,7 +2600,7 @@ index 746f289c393..5b9e31c1fa6 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { @@ -2616,7 +2611,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) -@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") { @@ -2627,7 +2622,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.SHUFFLE_PARTITIONS.key -> "9", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { -@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("bucket coalescing eliminates shuffle") { @@ -2638,17 +2633,7 @@ index 746f289c393..5b9e31c1fa6 100644 SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. -@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti - } - } - -- test("bucket coalescing is applied when join expressions match with partitioning expressions") { -+ test("bucket coalescing is applied when join expressions match with partitioning expressions", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") - df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") -@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan @@ -2659,6 +2644,7 @@ index 746f289c393..5b9e31c1fa6 100644 val scans = collect(plan) { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b ++ case n: CometNativeScanExec if n.optionalNumCoalescedBuckets.isDefined => n } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -2668,6 +2654,8 @@ index 746f289c393..5b9e31c1fa6 100644 + assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + case b: CometScanExec => + assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) ++ case n: CometNativeScanExec => ++ assert(n.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } } else { assert(scans.isEmpty) @@ -2697,20 +2685,18 @@ index 6f897a9c0b7..b0723634f68 100644 protected override lazy val sql = spark.sql _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -index d675503a8ba..c386a8cb686 100644 +index d675503a8ba..f220892396e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala -@@ -17,7 +17,8 @@ - +@@ -18,6 +18,7 @@ package org.apache.spark.sql.sources --import org.apache.spark.sql.QueryTest -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest} -+import org.apache.spark.sql.comet.CometScanExec + import org.apache.spark.sql.QueryTest ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite +@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan @@ -2718,60 +2704,11 @@ index d675503a8ba..c386a8cb686 100644 + val bucketedScan = collect(plan) { + case s: FileSourceScanExec if s.bucketedScan => s + case s: CometScanExec if s.bucketedScan => s ++ case s: CometNativeScanExec if s.bucketedScan => s + } assert(bucketedScan.length == expectedNumBucketedScan) } -@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - basic test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - basic test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - multiple bucketed columns test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2") -@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("SPARK-32859: disable unnecessary bucketed table scan - other operators test") { -+ test("SPARK-32859: disable unnecessary bucketed table scan - other operators test", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") - df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2") -@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite - } - } - -- test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") { -+ test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319")) { - withTable("t1") { - withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { - sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 7f6fa2a123e..c778b4e2c48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala