diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala index b3051953740f..8b070083a2e4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala @@ -81,7 +81,13 @@ object HeuristicTransform { node => validator.validate(node) match { case Validator.Passed => - rule.offload(node) + val offloaded = rule.offload(node) + if (offloaded ne node) { + node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).foreach { + lp => offloaded.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, lp) + } + } + offloaded case Validator.Failed(reason) => logDebug(s"Validation failed by reason: $reason on query plan: ${node.nodeName}") if (FallbackTags.maybeOffloadable(node)) { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala index c0c44f390d29..d24616ed1a56 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala @@ -25,7 +25,22 @@ import org.apache.spark.sql.execution.SparkPlan class LegacyOffload(rules: Seq[OffloadSingleNode]) extends Rule[SparkPlan] with LogLevelUtil { def apply(plan: SparkPlan): SparkPlan = { val out = - rules.foldLeft(plan)((p, rule) => p.transformUp { case p => rule.offload(p) }) + rules.foldLeft(plan) { + (p, rule) => + p.transformUp { + case node => + val offloaded = rule.offload(node) + if (offloaded ne node) { + // Propagate logical plan tag from original to offloaded node (non-recursive). + // Using setTagValue directly to avoid setLogicalLink's recursive propagation + // to children, which would incorrectly tag Exchange nodes. + node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).foreach { + lp => offloaded.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, lp) + } + } + offloaded + } + } out } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala index 9a6e271b35ac..b9e02461b1fb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala @@ -36,6 +36,7 @@ object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper { scan) && scan.supportPushDownFilters => val newScan = scan.withNewPushdownFilters(splitConjunctivePredicates(filter.cond)) if (newScan.doValidate().ok()) { + newScan.copyTagsFrom(scan) filter.withNewChildren(Seq(newScan)) } else { filter diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index d0716932b756..cc4e5ed0993f 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -222,10 +222,13 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenCodeGeneratorWithInterpretedFallbackSuite] enableSuite[GlutenCollationExpressionSuite] enableSuite[GlutenCollationRegexpExpressionsSuite] - // TODO: 4.x enableSuite[GlutenCsvExpressionsSuite] // failures with GlutenPlugin + enableSuite[GlutenCsvExpressionsSuite] + .exclude("unsupported mode") enableSuite[GlutenDynamicPruningSubquerySuite] enableSuite[GlutenExprIdSuite] - // TODO: 4.x enableSuite[GlutenExpressionEvalHelperSuite] // 2 failures + enableSuite[GlutenExpressionEvalHelperSuite] + .exclude("SPARK-16489: checkEvaluation should fail if expression reuses variable names") + .exclude("SPARK-25388: checkEvaluation should fail if nullable in DataType is incorrect") enableSuite[GlutenExpressionImplUtilsSuite] enableSuite[GlutenExpressionSQLBuilderSuite] enableSuite[GlutenExpressionSetSuite] @@ -233,16 +236,21 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenHexSuite] enableSuite[GlutenMutableProjectionSuite] enableSuite[GlutenNamedExpressionSuite] - // TODO: 4.x enableSuite[GlutenObjectExpressionsSuite] // 7 failures + enableSuite[GlutenObjectExpressionsSuite] + .excludeByPrefix("SPARK-2359") + .excludeByPrefix("SPARK-2358") + .exclude("LambdaVariable should support interpreted execution") enableSuite[GlutenOrderingSuite] - // TODO: 4.x enableSuite[GlutenScalaUDFSuite] // 1 failure + enableSuite[GlutenScalaUDFSuite] + .exclude("variant basic output variant") enableSuite[GlutenSchemaPruningSuite] enableSuite[GlutenSelectedFieldSuite] // GlutenSubExprEvaluationRuntimeSuite is removed because SubExprEvaluationRuntimeSuite // is in test-jar without shaded Guava, while SubExprEvaluationRuntime is shaded. enableSuite[GlutenSubexpressionEliminationSuite] enableSuite[GlutenTimeWindowSuite] - // TODO: 4.x enableSuite[GlutenToPrettyStringSuite] // 1 failure + enableSuite[GlutenToPrettyStringSuite] + .exclude("Timestamp as pretty strings") enableSuite[GlutenUnsafeRowConverterSuite] enableSuite[GlutenUnwrapUDTExpressionSuite] enableSuite[GlutenV2ExpressionUtilsSuite] @@ -253,7 +261,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceV2MetricsSuite] enableSuite[GlutenDataSourceV2OptionSuite] enableSuite[GlutenDataSourceV2UtilsSuite] - // TODO: 4.x enableSuite[GlutenGroupBasedUpdateTableSuite] // 1 failure + enableSuite[GlutenGroupBasedUpdateTableSuite] + .exclude("update with NOT NULL checks") enableSuite[GlutenMergeIntoDataFrameSuite] enableSuite[GlutenProcedureSuite] enableSuite[GlutenPushablePredicateSuite] @@ -392,8 +401,10 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DoubleType") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text - // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure + enableSuite[GlutenWholeTextFileV1Suite] + .exclude("reading text file with option wholetext=true") + enableSuite[GlutenWholeTextFileV2Suite] + .exclude("reading text file with option wholetext=true") // Generated suites for org.apache.spark.sql.execution.datasources.v2 enableSuite[GlutenFileWriterFactorySuite] enableSuite[GlutenV2SessionCatalogNamespaceSuite] @@ -696,39 +707,85 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenAggregatingAccumulatorSuite] enableSuite[GlutenCoGroupedIteratorSuite] enableSuite[GlutenColumnarRulesSuite] - // TODO: 4.x enableSuite[GlutenDataSourceScanExecRedactionSuite] // 2 failures - // TODO: 4.x enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] // 2 failures + enableSuite[GlutenDataSourceScanExecRedactionSuite] + .exclude("explain is redacted using SQLConf") + .exclude("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") + enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] + .exclude("explain is redacted using SQLConf") + .exclude("FileScan description") enableSuite[GlutenExecuteImmediateEndToEndSuite] - // TODO: 4.x enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite] // 14 failures + enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite] enableSuite[GlutenGlobalTempViewSuite] enableSuite[GlutenGlobalTempViewTestSuite] enableSuite[GlutenGroupedIteratorSuite] enableSuite[GlutenHiveResultSuite] - // TODO: 4.x enableSuite[GlutenInsertSortForLimitAndOffsetSuite] // 6 failures + enableSuite[GlutenInsertSortForLimitAndOffsetSuite] + .exclude("root LIMIT preserves data ordering with top-K sort") + .exclude("middle LIMIT preserves data ordering with top-K sort") + .exclude("root LIMIT preserves data ordering with CollectLimitExec") + .exclude("middle LIMIT preserves data ordering with the extra sort") + .exclude("root OFFSET preserves data ordering with CollectLimitExec") + .exclude("middle OFFSET preserves data ordering with the extra sort") enableSuite[GlutenLocalTempViewTestSuite] - // TODO: 4.x enableSuite[GlutenLogicalPlanTagInSparkPlanSuite] // RUN ABORTED + enableSuite[GlutenLogicalPlanTagInSparkPlanSuite] enableSuite[GlutenOptimizeMetadataOnlyQuerySuite] enableSuite[GlutenPersistedViewTestSuite] - // TODO: 4.x enableSuite[GlutenPlannerSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenProjectedOrderingAndPartitioningSuite] // 6 failures + enableSuite[GlutenPlannerSuite] + .excludeByPrefix("efficient terminal limit") + .excludeByPrefix("terminal limit -> project -> sort") + .excludeByPrefix("TakeOrderedAndProject can appear") + .excludeByPrefix("TakeOrderedAndProjectExec appears only") + .excludeByPrefix("SPARK-24242") + .excludeByPrefix("SPARK-24556") + .excludeByPrefix("SPARK-33399") + .excludeByPrefix("SPARK-33400") + .excludeByPrefix("sort order doesn't have repeated") + .excludeByPrefix("aliases to expressions should not be replaced") + .excludeByPrefix("aliases in the object hash") + .excludeByPrefix("SPARK-33758") + .excludeByPrefix("SPARK-40086") + enableSuite[GlutenProjectedOrderingAndPartitioningSuite] + .excludeByPrefix("SPARK-42049") enableSuite[GlutenQueryPlanningTrackerEndToEndSuite] - // TODO: 4.x enableSuite[GlutenRemoveRedundantProjectsSuite] // 14 failures - // TODO: 4.x enableSuite[GlutenRemoveRedundantSortsSuite] // 1 failure + enableSuite[GlutenRemoveRedundantProjectsSuite] + .exclude("project with filter") + .exclude("project with specific column ordering") + .exclude("project with extra columns") + .exclude("project with fewer columns") + .exclude("aggregate without ordering requirement") + .exclude("aggregate with ordering requirement") + .exclude("join without ordering requirement") + .exclude("join with ordering requirement") + .exclude("window function") + .exclude("generate should require column ordering") + .exclude("subquery") + .exclude("SPARK-33697: UnionExec should require column ordering") + .exclude("SPARK-33697: remove redundant projects under expand") + .exclude("SPARK-36020: Project should not be removed when child's logical link is different") + enableSuite[GlutenRemoveRedundantSortsSuite] + .exclude("cached sorted data doesn't need to be re-sorted") + .exclude("SPARK-33472: shuffled join with different left and right side partition numbers") + .exclude("remove redundant sorts with limit") + .exclude("remove redundant sorts with broadcast hash join") + .exclude("remove redundant sorts with sort merge join") enableSuite[GlutenRowToColumnConverterSuite] - // TODO: 4.x enableSuite[GlutenSQLExecutionSuite] // 1 failure + enableSuite[GlutenSQLExecutionSuite] enableSuite[GlutenSQLFunctionSuite] - // TODO: 4.x enableSuite[GlutenSQLJsonProtocolSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenShufflePartitionsUtilSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenSimpleSQLViewSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenSparkPlanSuite] // 1 failure + enableSuite[GlutenSQLJsonProtocolSuite] + enableSuite[GlutenShufflePartitionsUtilSuite] + enableSuite[GlutenSimpleSQLViewSuite] + .exclude("alter temporary view should follow current storeAnalyzedPlanForView config") + .exclude("SPARK-53968 reading the view after allowPrecisionLoss is changed") + enableSuite[GlutenSparkPlanSuite] + .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") enableSuite[GlutenSparkPlannerSuite] enableSuite[GlutenSparkScriptTransformationSuite] enableSuite[GlutenSparkSqlParserSuite] enableSuite[GlutenUnsafeFixedWidthAggregationMapSuite] enableSuite[GlutenUnsafeKVExternalSorterSuite] - // TODO: 4.x enableSuite[GlutenUnsafeRowSerializerSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeStageCodegenSparkSubmitSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeStageCodegenSuite] // 24 failures + enableSuite[GlutenUnsafeRowSerializerSuite] + // TODO: 4.x enableSuite[GlutenWholeStageCodegenSparkSubmitSuite] // depends on codegen path + // TODO: 4.x enableSuite[GlutenWholeStageCodegenSuite] // 24 failures: all test WholeStageCodegen which Gluten bypasses enableSuite[GlutenBroadcastExchangeSuite] enableSuite[GlutenLocalBroadcastExchangeSuite] enableSuite[GlutenCoalesceShufflePartitionsSuite] @@ -825,10 +882,15 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataFrameTransposeSuite] enableSuite[GlutenDefaultANSIValueSuite] enableSuite[GlutenDeprecatedDatasetAggregatorSuite] - // TODO: 4.x enableSuite[GlutenExplainSuite] // 1 failure + enableSuite[GlutenExplainSuite] + .exclude("SPARK-33853: explain codegen - check presence of subquery") + .exclude("explain formatted - check presence of subquery in case of DPP") + .exclude("Support ExplainMode in Dataset.explain") + .exclude("Explain formatted output for scan operator for datasource V2") enableSuite[GlutenICUCollationsMapSuite] enableSuite[GlutenInlineTableParsingImprovementsSuite] - // TODO: 4.x enableSuite[GlutenJoinHintSuite] // 1 failure + enableSuite[GlutenJoinHintSuite] + .exclude("join strategy hint - shuffle-replicate-nl") enableSuite[GlutenLogQuerySuite] // Overridden .exclude("Query Spark logs with exception using SQL") @@ -839,7 +901,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenRuntimeConfigSuite] enableSuite[GlutenSSBQuerySuite] enableSuite[GlutenSessionStateSuite] - // TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure + // TODO: 4.x enableSuite[GlutenSetCommandSuite] // hive-site.xml hadoop conf not loaded enableSuite[GlutenSparkSessionBuilderSuite] enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] enableSuite[GlutenTPCDSCollationQueryTestSuite] @@ -1093,7 +1155,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] enableSuite[GlutenCollapseProjectExecTransformerSuite] - // TODO: 4.x enableSuite[GlutenSparkSessionExtensionSuite] // 1 failure + // TODO: 4.x enableSuite[GlutenSparkSessionExtensionSuite] // GlutenPlugin interferes with custom session extensions enableSuite[GlutenGroupBasedDeleteFromTableSuite] enableSuite[GlutenDeltaBasedDeleteFromTableSuite] enableSuite[GlutenDataFrameToSchemaSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala index 8fbb53e24c17..b0b8ae6b2af2 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala @@ -18,4 +18,23 @@ package org.apache.spark.sql class GlutenSparkSessionJobTaggingAndCancellationSuite extends SparkSessionJobTaggingAndCancellationSuite - with GlutenTestsTrait {} + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala index 02c0dd69dfd7..a2d2e1bc624c 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC_OPT +import org.apache.spark.sql.types.{DoubleType, StructField, StructType} -class GlutenCsvExpressionsSuite extends CsvExpressionsSuite with GlutenTestsTrait {} +class GlutenCsvExpressionsSuite extends CsvExpressionsSuite with GlutenTestsTrait { + + // Gluten's checkEvaluation (DataFrame-based) throws AnalysisException directly, + // not wrapped in TestFailedException as the codegen/interpreted path does. + testGluten("unsupported mode - gluten") { + val csvData = "---" + val schema = StructType(StructField("a", DoubleType) :: Nil) + checkError( + exception = intercept[AnalysisException] { + checkEvaluation( + CsvToStructs(schema, Map("mode" -> "DROPMALFORMED"), Literal(csvData), UTC_OPT), + InternalRow(null)) + }, + condition = "PARSE_MODE_UNSUPPORTED", + parameters = Map("funcName" -> "`from_csv`", "mode" -> "DROPMALFORMED") + ) + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala index f947c4f406aa..cd64b5e55e1f 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala @@ -16,8 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait class GlutenExternalAppendOnlyUnsafeRowArraySuite extends ExternalAppendOnlyUnsafeRowArraySuite - with GlutenTestsTrait {} + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala index 297d3b2a3428..a01c81a08fa3 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala @@ -16,8 +16,176 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.execution._ + import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Final} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec + +import scala.reflect.ClassTag class GlutenLogicalPlanTagInSparkPlanSuite extends LogicalPlanTagInSparkPlanSuite - with GlutenSQLTestsTrait {} + with GlutenSQLTestsTrait { + + // Override to use Gluten-aware logical plan tag checking. + // Gluten replaces Spark physical operators with Transformer nodes that don't match + // the original Spark pattern matching in LogicalPlanTagInSparkPlanSuite. + override protected def checkGeneratedCode( + plan: SparkPlan, + checkMethodCodeSize: Boolean = true): Unit = { + // Skip parent's codegen check (Gluten doesn't use WholeStageCodegen). + // Only run the Gluten-aware logical plan tag check. + checkGlutenLogicalPlanTag(plan) + } + + private def isFinalAgg(aggExprs: Seq[AggregateExpression]): Boolean = { + aggExprs.nonEmpty && aggExprs.forall(ae => ae.mode == Complete || ae.mode == Final) + } + + private def checkGlutenLogicalPlanTag(plan: SparkPlan): Unit = { + plan match { + // Joins (Gluten + Spark) + case _: BroadcastHashJoinExecTransformerBase | _: ShuffledHashJoinExecTransformerBase | + _: SortMergeJoinExecTransformerBase | _: CartesianProductExecTransformer | + _: BroadcastNestedLoopJoinExecTransformer | _: joins.BroadcastHashJoinExec | + _: joins.ShuffledHashJoinExec | _: joins.SortMergeJoinExec | + _: joins.BroadcastNestedLoopJoinExec | _: joins.CartesianProductExec => + assertLogicalPlanType[Join](plan) + + // Aggregates - only final (Gluten + Spark) + case agg: HashAggregateExecBaseTransformer if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.HashAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.ObjectHashAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.SortAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + + // Window + case _: WindowExecTransformer | _: window.WindowExec => + assertLogicalPlanType[Window](plan) + + // Union + case _: ColumnarUnionExec | _: UnionExec => + assertLogicalPlanType[Union](plan) + + // Sample + case _: SampleExec => + assertLogicalPlanType[Sample](plan) + + // Generate + case _: GenerateExecTransformerBase | _: GenerateExec => + assertLogicalPlanType[Generate](plan) + + // Exchange nodes should NOT have logical plan tags + case _: ColumnarShuffleExchangeExec | _: ColumnarBroadcastExchangeExec | + _: exchange.ShuffleExchangeExec | _: exchange.BroadcastExchangeExec | + _: ReusedExchangeExec => + assert( + plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty, + s"${plan.getClass.getSimpleName} should not have a logical plan tag") + + // Subquery exec nodes don't have logical plan tags + case _: SubqueryExec | _: ReusedSubqueryExec => + assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) + + // Gluten infrastructure nodes (no corresponding logical plan) + case _: WholeStageTransformer | _: InputIteratorTransformer | _: ColumnarInputAdapter | + _: VeloxResizeBatchesExec => + // These are Gluten-specific wrapper nodes without logical plan links. + + // Scan trees + case _ if isGlutenScanPlanTree(plan) => + // For scan plan trees (leaf under Project/Filter), we check that the leaf node + // has a correct logical plan link. The intermediate Project/Filter nodes may not + // have tags if they were created by Gluten's rewrite rules. + val physicalLeaves = plan.collectLeaves() + assert( + physicalLeaves.length == 1, + s"Expected 1 physical leaf, got ${physicalLeaves.length}") + + val leafNode = physicalLeaves.head + // Find the logical plan from the leaf or any ancestor with a tag + val logicalPlanOpt = leafNode + .getTagValue(SparkPlan.LOGICAL_PLAN_TAG) + .orElse(leafNode.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)) + .orElse(findLogicalPlanInTree(plan)) + + logicalPlanOpt.foreach { + lp => + val logicalPlan = lp match { + case w: WithCTE => w.plan + case o => o + } + val logicalLeaves = logicalPlan.collectLeaves() + assert( + logicalLeaves.length == 1, + s"Expected 1 logical leaf, got ${logicalLeaves.length}") + physicalLeaves.head match { + case _: RangeExec => assert(logicalLeaves.head.isInstanceOf[Range]) + case _: DataSourceScanExec | _: BasicScanExecTransformer => + assert(logicalLeaves.head.isInstanceOf[LogicalRelation]) + case _: InMemoryTableScanExec => + assert(logicalLeaves.head.isInstanceOf[columnar.InMemoryRelation]) + case _: LocalTableScanExec => assert(logicalLeaves.head.isInstanceOf[LocalRelation]) + case _: ExternalRDDScanExec[_] => + assert(logicalLeaves.head.isInstanceOf[ExternalRDD[_]]) + case _: datasources.v2.BatchScanExec => + assert(logicalLeaves.head.isInstanceOf[DataSourceV2Relation]) + case _ => + } + } + return + + case _ => + } + + plan.children.foreach(checkGlutenLogicalPlanTag) + plan.subqueries.foreach(checkGlutenLogicalPlanTag) + } + + @scala.annotation.tailrec + private def isGlutenScanPlanTree(plan: SparkPlan): Boolean = plan match { + case ColumnarToRowExec(i: InputAdapter) => isGlutenScanPlanTree(i.child) + case p: ProjectExec => isGlutenScanPlanTree(p.child) + case p: ProjectExecTransformer => isGlutenScanPlanTree(p.child) + case f: FilterExec => isGlutenScanPlanTree(f.child) + case f: FilterExecTransformerBase => isGlutenScanPlanTree(f.child) + case _: LeafExecNode => true + case _ => false + } + + /** Find any node in the tree that has a LOGICAL_PLAN_TAG. */ + private def findLogicalPlanInTree(plan: SparkPlan): Option[LogicalPlan] = { + plan + .getTagValue(SparkPlan.LOGICAL_PLAN_TAG) + .orElse(plan.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)) + .orElse(plan.children.collectFirst { + case child if findLogicalPlanInTree(child).isDefined => findLogicalPlanInTree(child).get + }) + } + + private def getGlutenLogicalPlan(node: SparkPlan): LogicalPlan = { + node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).getOrElse { + node.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG).getOrElse { + fail(node.getClass.getSimpleName + " does not have a logical plan link") + } + } + } + + private def assertLogicalPlanType[T <: LogicalPlan: ClassTag](node: SparkPlan): Unit = { + val logicalPlan = getGlutenLogicalPlan(node) + val expectedCls = implicitly[ClassTag[T]].runtimeClass + assert( + expectedCls == logicalPlan.getClass, + s"Expected ${expectedCls.getSimpleName} but got ${logicalPlan.getClass.getSimpleName}" + + s" for ${node.getClass.getSimpleName}" + ) + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala index 51c06b3b6900..d7df069d4447 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenSQLExecutionSuite extends SQLExecutionSuite with GlutenTestsTrait {} +class GlutenSQLExecutionSuite extends SQLExecutionSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + // Inject GlutenPlugin so per-test SparkSessions created by the parent suite load it. + // SparkConf reads system properties prefixed with "spark." as defaults. + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala index 7ce28aae8917..f868ebfd2998 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala @@ -16,6 +16,25 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenSQLJsonProtocolSuite extends SQLJsonProtocolSuite with GlutenTestsTrait {} +class GlutenSQLJsonProtocolSuite extends SQLJsonProtocolSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala index 866d6cea5423..f56e0cc3b8ef 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenShufflePartitionsUtilSuite extends ShufflePartitionsUtilSuite with GlutenTestsTrait {} +class GlutenShufflePartitionsUtilSuite + extends ShufflePartitionsUtilSuite + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala index a3f0a577d782..3549004e56c9 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala @@ -16,6 +16,37 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.execution.{ColumnarToRowExecBase => GlutenC2R} + import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.internal.SQLConf + +class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait { -class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {} + testGluten( + "SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { + path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + // Gluten replaces ColumnarToRowExec with VeloxColumnarToRowExec + val c2r = df.queryExecution.executedPlan + .collectFirst { case p: GlutenC2R => p } + .orElse(df.queryExecution.executedPlan + .collectFirst { case p: ColumnarToRowExec => p }) + .get + try { + spark.range(1).foreach { + _ => + c2r.canonicalized + () + } + } catch { + case e: Throwable => + fail("ColumnarToRow was not canonicalizable", e) + } + } + } + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala index 39a52a35dda6..2ec6ca1a9fc7 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala @@ -16,6 +16,25 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenUnsafeRowSerializerSuite extends UnsafeRowSerializerSuite with GlutenTestsTrait {} +class GlutenUnsafeRowSerializerSuite extends UnsafeRowSerializerSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 47a1ff3d66e7..46c75f9e112d 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -232,11 +232,15 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenCodeGenerationSuite] enableSuite[GlutenCodeGeneratorWithInterpretedFallbackSuite] enableSuite[GlutenCollationExpressionSuite] - // TODO: 4.x enableSuite[GlutenCollationRegexpExpressionsSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenCsvExpressionsSuite] // failures with GlutenPlugin + enableSuite[GlutenCollationRegexpExpressionsSuite] + .exclude("StringSplit expression with collated strings") + enableSuite[GlutenCsvExpressionsSuite] + .exclude("unsupported mode") enableSuite[GlutenDynamicPruningSubquerySuite] enableSuite[GlutenExprIdSuite] - // TODO: 4.x enableSuite[GlutenExpressionEvalHelperSuite] // 2 failures + enableSuite[GlutenExpressionEvalHelperSuite] + .exclude("SPARK-16489: checkEvaluation should fail if expression reuses variable names") + .exclude("SPARK-25388: checkEvaluation should fail if nullable in DataType is incorrect") enableSuite[GlutenExpressionImplUtilsSuite] enableSuite[GlutenExpressionSQLBuilderSuite] enableSuite[GlutenExpressionSetSuite] @@ -244,16 +248,22 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenHexSuite] enableSuite[GlutenMutableProjectionSuite] enableSuite[GlutenNamedExpressionSuite] - // TODO: 4.x enableSuite[GlutenObjectExpressionsSuite] // 7 failures - // TODO: 4.x enableSuite[GlutenOrderingSuite] // 2 failures - // TODO: 4.x enableSuite[GlutenScalaUDFSuite] // 1 failure + enableSuite[GlutenObjectExpressionsSuite] + .excludeByPrefix("SPARK-2359") + .excludeByPrefix("SPARK-2358") + .exclude("LambdaVariable should support interpreted execution") + enableSuite[GlutenOrderingSuite] + .excludeByPrefix("GenerateOrdering with TimeType") + enableSuite[GlutenScalaUDFSuite] + .exclude("variant basic output variant") enableSuite[GlutenSchemaPruningSuite] enableSuite[GlutenSelectedFieldSuite] // GlutenSubExprEvaluationRuntimeSuite is removed because SubExprEvaluationRuntimeSuite // is in test-jar without shaded Guava, while SubExprEvaluationRuntime is shaded. enableSuite[GlutenSubexpressionEliminationSuite] enableSuite[GlutenTimeWindowSuite] - // TODO: 4.x enableSuite[GlutenToPrettyStringSuite] // 1 failure + enableSuite[GlutenToPrettyStringSuite] + .exclude("Timestamp as pretty strings") enableSuite[GlutenUnsafeRowConverterSuite] enableSuite[GlutenUnwrapUDTExpressionSuite] enableSuite[GlutenV2ExpressionUtilsSuite] @@ -264,7 +274,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceV2MetricsSuite] enableSuite[GlutenDataSourceV2OptionSuite] enableSuite[GlutenDataSourceV2UtilsSuite] - // TODO: 4.x enableSuite[GlutenGroupBasedUpdateTableSuite] // 1 failure + enableSuite[GlutenGroupBasedUpdateTableSuite] + .exclude("update with NOT NULL checks") enableSuite[GlutenMergeIntoDataFrameSuite] enableSuite[GlutenProcedureSuite] enableSuite[GlutenPushablePredicateSuite] @@ -403,8 +414,10 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DoubleType") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text - // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure + enableSuite[GlutenWholeTextFileV1Suite] + .exclude("reading text file with option wholetext=true") + enableSuite[GlutenWholeTextFileV2Suite] + .exclude("reading text file with option wholetext=true") // Generated suites for org.apache.spark.sql.execution.datasources.v2 enableSuite[GlutenFileWriterFactorySuite] enableSuite[GlutenV2SessionCatalogNamespaceSuite] @@ -665,40 +678,88 @@ class VeloxTestSettings extends BackendTestSettings { // Generated suites for org.apache.spark.sql.execution enableSuite[GlutenAggregatingAccumulatorSuite] enableSuite[GlutenCoGroupedIteratorSuite] - // TODO: 4.x enableSuite[GlutenColumnarRulesSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenDataSourceScanExecRedactionSuite] // 2 failures - // TODO: 4.x enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] // 2 failures + enableSuite[GlutenColumnarRulesSuite] + .excludeByPrefix("SPARK-51474") + enableSuite[GlutenDataSourceScanExecRedactionSuite] + .exclude("explain is redacted using SQLConf") + .exclude("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") + enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] + .exclude("explain is redacted using SQLConf") + .exclude("FileScan description") enableSuite[GlutenExecuteImmediateEndToEndSuite] - // TODO: 4.x enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite] // 14 failures + enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite] enableSuite[GlutenGlobalTempViewSuite] enableSuite[GlutenGlobalTempViewTestSuite] enableSuite[GlutenGroupedIteratorSuite] - // TODO: 4.x enableSuite[GlutenHiveResultSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenInsertSortForLimitAndOffsetSuite] // 6 failures + enableSuite[GlutenHiveResultSuite] + .exclude("time formatting in hive result") + enableSuite[GlutenInsertSortForLimitAndOffsetSuite] + .exclude("root LIMIT preserves data ordering with top-K sort") + .exclude("middle LIMIT preserves data ordering with top-K sort") + .exclude("root LIMIT preserves data ordering with CollectLimitExec") + .exclude("middle LIMIT preserves data ordering with the extra sort") + .exclude("root OFFSET preserves data ordering with CollectLimitExec") + .exclude("middle OFFSET preserves data ordering with the extra sort") enableSuite[GlutenLocalTempViewTestSuite] - // TODO: 4.x enableSuite[GlutenLogicalPlanTagInSparkPlanSuite] // RUN ABORTED + enableSuite[GlutenLogicalPlanTagInSparkPlanSuite] enableSuite[GlutenOptimizeMetadataOnlyQuerySuite] enableSuite[GlutenPersistedViewTestSuite] - // TODO: 4.x enableSuite[GlutenPlannerSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenProjectedOrderingAndPartitioningSuite] // 6 failures + enableSuite[GlutenPlannerSuite] + .excludeByPrefix("efficient terminal limit") + .excludeByPrefix("terminal limit -> project -> sort") + .excludeByPrefix("TakeOrderedAndProject can appear") + .excludeByPrefix("TakeOrderedAndProjectExec appears only") + .excludeByPrefix("SPARK-24242") + .excludeByPrefix("SPARK-24556") + .excludeByPrefix("SPARK-33399") + .excludeByPrefix("SPARK-33400") + .excludeByPrefix("sort order doesn't have repeated") + .excludeByPrefix("aliases to expressions should not be replaced") + .excludeByPrefix("aliases in the object hash") + .excludeByPrefix("SPARK-33758") + .excludeByPrefix("SPARK-40086") + enableSuite[GlutenProjectedOrderingAndPartitioningSuite] + .excludeByPrefix("SPARK-42049") enableSuite[GlutenQueryPlanningTrackerEndToEndSuite] - // TODO: 4.x enableSuite[GlutenRemoveRedundantProjectsSuite] // 14 failures - // TODO: 4.x enableSuite[GlutenRemoveRedundantSortsSuite] // 1 failure + enableSuite[GlutenRemoveRedundantProjectsSuite] + .exclude("project with filter") + .exclude("project with specific column ordering") + .exclude("project with extra columns") + .exclude("project with fewer columns") + .exclude("aggregate without ordering requirement") + .exclude("aggregate with ordering requirement") + .exclude("join without ordering requirement") + .exclude("join with ordering requirement") + .exclude("window function") + .exclude("generate should require column ordering") + .exclude("subquery") + .exclude("SPARK-33697: UnionExec should require column ordering") + .exclude("SPARK-33697: remove redundant projects under expand") + .exclude("SPARK-36020: Project should not be removed when child's logical link is different") + enableSuite[GlutenRemoveRedundantSortsSuite] + .exclude("cached sorted data doesn't need to be re-sorted") + .exclude("SPARK-33472: shuffled join with different left and right side partition numbers") + .exclude("remove redundant sorts with limit") + .exclude("remove redundant sorts with broadcast hash join") + .exclude("remove redundant sorts with sort merge join") enableSuite[GlutenRowToColumnConverterSuite] - // TODO: 4.x enableSuite[GlutenSQLExecutionSuite] // 1 failure + enableSuite[GlutenSQLExecutionSuite] enableSuite[GlutenSQLFunctionSuite] - // TODO: 4.x enableSuite[GlutenSQLJsonProtocolSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenShufflePartitionsUtilSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenSimpleSQLViewSuite] // 2 failures - // TODO: 4.x enableSuite[GlutenSparkPlanSuite] // 1 failure + enableSuite[GlutenSQLJsonProtocolSuite] + enableSuite[GlutenShufflePartitionsUtilSuite] + enableSuite[GlutenSimpleSQLViewSuite] + .exclude("alter temporary view should follow current storeAnalyzedPlanForView config") + .exclude("SPARK-53968 reading the view after allowPrecisionLoss is changed") + enableSuite[GlutenSparkPlanSuite] + .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") enableSuite[GlutenSparkPlannerSuite] enableSuite[GlutenSparkScriptTransformationSuite] enableSuite[GlutenSparkSqlParserSuite] enableSuite[GlutenUnsafeFixedWidthAggregationMapSuite] enableSuite[GlutenUnsafeKVExternalSorterSuite] - // TODO: 4.x enableSuite[GlutenUnsafeRowSerializerSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeStageCodegenSparkSubmitSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenWholeStageCodegenSuite] // 24 failures + enableSuite[GlutenUnsafeRowSerializerSuite] + // TODO: 4.x enableSuite[GlutenWholeStageCodegenSparkSubmitSuite] // depends on codegen path + // TODO: 4.x enableSuite[GlutenWholeStageCodegenSuite] // 24 failures: all test WholeStageCodegen which Gluten bypasses enableSuite[GlutenBroadcastExchangeSuite] .exclude("SPARK-52962: broadcast exchange should not reset metrics") // Add Gluten test enableSuite[GlutenLocalBroadcastExchangeSuite] @@ -796,21 +857,26 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataFrameTableValuedFunctionsSuite] enableSuite[GlutenDataFrameTransposeSuite] enableSuite[GlutenDeprecatedDatasetAggregatorSuite] - // TODO: 4.x enableSuite[GlutenExplainSuite] // 1 failure + enableSuite[GlutenExplainSuite] + .exclude("SPARK-33853: explain codegen - check presence of subquery") + .exclude("explain formatted - check presence of subquery in case of DPP") + .exclude("Support ExplainMode in Dataset.explain") + .exclude("Explain formatted output for scan operator for datasource V2") enableSuite[GlutenICUCollationsMapSuite] enableSuite[GlutenInlineTableParsingImprovementsSuite] - // TODO: 4.x enableSuite[GlutenJoinHintSuite] // 1 failure + enableSuite[GlutenJoinHintSuite] + .exclude("join strategy hint - shuffle-replicate-nl") enableSuite[GlutenLogQuerySuite] // Overridden .exclude("Query Spark logs with exception using SQL") enableSuite[GlutenPercentileQuerySuite] - // TODO: 4.x enableSuite[GlutenRandomDataGeneratorSuite] // 232 failures + // TODO: 4.x enableSuite[GlutenRandomDataGeneratorSuite] // 232 TimeType failures, need Velox C++ support enableSuite[GlutenRowJsonSuite] enableSuite[GlutenRowSuite] enableSuite[GlutenRuntimeConfigSuite] enableSuite[GlutenSSBQuerySuite] enableSuite[GlutenSessionStateSuite] - // TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure + // TODO: 4.x enableSuite[GlutenSetCommandSuite] // hive-site.xml hadoop conf not loaded enableSuite[GlutenSparkSessionBuilderSuite] enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] enableSuite[GlutenTPCDSCollationQueryTestSuite] @@ -1084,7 +1150,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFallbackSuite] enableSuite[GlutenHiveSQLQuerySuite] enableSuite[GlutenCollapseProjectExecTransformerSuite] - // TODO: 4.x enableSuite[GlutenSparkSessionExtensionSuite] // 1 failure + // TODO: 4.x enableSuite[GlutenSparkSessionExtensionSuite] // GlutenPlugin interferes with custom session extensions enableSuite[GlutenGroupBasedDeleteFromTableSuite] enableSuite[GlutenDeltaBasedDeleteFromTableSuite] enableSuite[GlutenDataFrameToSchemaSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala index 6befbbe213c2..b0b8ae6b2af2 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala @@ -16,8 +16,25 @@ */ package org.apache.spark.sql -import org.apache.spark.sql.shim.GlutenTestsTrait - class GlutenSparkSessionJobTaggingAndCancellationSuite extends SparkSessionJobTaggingAndCancellationSuite - with GlutenTestsTrait {} + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala index 90d2575c9d6e..19e3724823b2 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCsvExpressionsSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC_OPT import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.types.{DoubleType, StructField, StructType} -class GlutenCsvExpressionsSuite extends CsvExpressionsSuite with GlutenTestsTrait {} +class GlutenCsvExpressionsSuite extends CsvExpressionsSuite with GlutenTestsTrait { + + // Gluten's checkEvaluation (DataFrame-based) throws AnalysisException directly, + // not wrapped in TestFailedException as the codegen/interpreted path does. + testGluten("unsupported mode - gluten") { + val csvData = "---" + val schema = StructType(StructField("a", DoubleType) :: Nil) + checkError( + exception = intercept[AnalysisException] { + checkEvaluation( + CsvToStructs(schema, Map("mode" -> "DROPMALFORMED"), Literal(csvData), UTC_OPT), + InternalRow(null)) + }, + condition = "PARSE_MODE_UNSUPPORTED", + parameters = Map("funcName" -> "`from_csv`", "mode" -> "DROPMALFORMED") + ) + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala index cd3aeb6438de..cd64b5e55e1f 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenExternalAppendOnlyUnsafeRowArraySuite.scala @@ -16,8 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait class GlutenExternalAppendOnlyUnsafeRowArraySuite extends ExternalAppendOnlyUnsafeRowArraySuite - with GlutenTestsTrait {} + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala index 297d3b2a3428..a01c81a08fa3 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenLogicalPlanTagInSparkPlanSuite.scala @@ -16,8 +16,176 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.execution._ + import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Final} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec + +import scala.reflect.ClassTag class GlutenLogicalPlanTagInSparkPlanSuite extends LogicalPlanTagInSparkPlanSuite - with GlutenSQLTestsTrait {} + with GlutenSQLTestsTrait { + + // Override to use Gluten-aware logical plan tag checking. + // Gluten replaces Spark physical operators with Transformer nodes that don't match + // the original Spark pattern matching in LogicalPlanTagInSparkPlanSuite. + override protected def checkGeneratedCode( + plan: SparkPlan, + checkMethodCodeSize: Boolean = true): Unit = { + // Skip parent's codegen check (Gluten doesn't use WholeStageCodegen). + // Only run the Gluten-aware logical plan tag check. + checkGlutenLogicalPlanTag(plan) + } + + private def isFinalAgg(aggExprs: Seq[AggregateExpression]): Boolean = { + aggExprs.nonEmpty && aggExprs.forall(ae => ae.mode == Complete || ae.mode == Final) + } + + private def checkGlutenLogicalPlanTag(plan: SparkPlan): Unit = { + plan match { + // Joins (Gluten + Spark) + case _: BroadcastHashJoinExecTransformerBase | _: ShuffledHashJoinExecTransformerBase | + _: SortMergeJoinExecTransformerBase | _: CartesianProductExecTransformer | + _: BroadcastNestedLoopJoinExecTransformer | _: joins.BroadcastHashJoinExec | + _: joins.ShuffledHashJoinExec | _: joins.SortMergeJoinExec | + _: joins.BroadcastNestedLoopJoinExec | _: joins.CartesianProductExec => + assertLogicalPlanType[Join](plan) + + // Aggregates - only final (Gluten + Spark) + case agg: HashAggregateExecBaseTransformer if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.HashAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.ObjectHashAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + case agg: aggregate.SortAggregateExec if isFinalAgg(agg.aggregateExpressions) => + assertLogicalPlanType[Aggregate](plan) + + // Window + case _: WindowExecTransformer | _: window.WindowExec => + assertLogicalPlanType[Window](plan) + + // Union + case _: ColumnarUnionExec | _: UnionExec => + assertLogicalPlanType[Union](plan) + + // Sample + case _: SampleExec => + assertLogicalPlanType[Sample](plan) + + // Generate + case _: GenerateExecTransformerBase | _: GenerateExec => + assertLogicalPlanType[Generate](plan) + + // Exchange nodes should NOT have logical plan tags + case _: ColumnarShuffleExchangeExec | _: ColumnarBroadcastExchangeExec | + _: exchange.ShuffleExchangeExec | _: exchange.BroadcastExchangeExec | + _: ReusedExchangeExec => + assert( + plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty, + s"${plan.getClass.getSimpleName} should not have a logical plan tag") + + // Subquery exec nodes don't have logical plan tags + case _: SubqueryExec | _: ReusedSubqueryExec => + assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) + + // Gluten infrastructure nodes (no corresponding logical plan) + case _: WholeStageTransformer | _: InputIteratorTransformer | _: ColumnarInputAdapter | + _: VeloxResizeBatchesExec => + // These are Gluten-specific wrapper nodes without logical plan links. + + // Scan trees + case _ if isGlutenScanPlanTree(plan) => + // For scan plan trees (leaf under Project/Filter), we check that the leaf node + // has a correct logical plan link. The intermediate Project/Filter nodes may not + // have tags if they were created by Gluten's rewrite rules. + val physicalLeaves = plan.collectLeaves() + assert( + physicalLeaves.length == 1, + s"Expected 1 physical leaf, got ${physicalLeaves.length}") + + val leafNode = physicalLeaves.head + // Find the logical plan from the leaf or any ancestor with a tag + val logicalPlanOpt = leafNode + .getTagValue(SparkPlan.LOGICAL_PLAN_TAG) + .orElse(leafNode.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)) + .orElse(findLogicalPlanInTree(plan)) + + logicalPlanOpt.foreach { + lp => + val logicalPlan = lp match { + case w: WithCTE => w.plan + case o => o + } + val logicalLeaves = logicalPlan.collectLeaves() + assert( + logicalLeaves.length == 1, + s"Expected 1 logical leaf, got ${logicalLeaves.length}") + physicalLeaves.head match { + case _: RangeExec => assert(logicalLeaves.head.isInstanceOf[Range]) + case _: DataSourceScanExec | _: BasicScanExecTransformer => + assert(logicalLeaves.head.isInstanceOf[LogicalRelation]) + case _: InMemoryTableScanExec => + assert(logicalLeaves.head.isInstanceOf[columnar.InMemoryRelation]) + case _: LocalTableScanExec => assert(logicalLeaves.head.isInstanceOf[LocalRelation]) + case _: ExternalRDDScanExec[_] => + assert(logicalLeaves.head.isInstanceOf[ExternalRDD[_]]) + case _: datasources.v2.BatchScanExec => + assert(logicalLeaves.head.isInstanceOf[DataSourceV2Relation]) + case _ => + } + } + return + + case _ => + } + + plan.children.foreach(checkGlutenLogicalPlanTag) + plan.subqueries.foreach(checkGlutenLogicalPlanTag) + } + + @scala.annotation.tailrec + private def isGlutenScanPlanTree(plan: SparkPlan): Boolean = plan match { + case ColumnarToRowExec(i: InputAdapter) => isGlutenScanPlanTree(i.child) + case p: ProjectExec => isGlutenScanPlanTree(p.child) + case p: ProjectExecTransformer => isGlutenScanPlanTree(p.child) + case f: FilterExec => isGlutenScanPlanTree(f.child) + case f: FilterExecTransformerBase => isGlutenScanPlanTree(f.child) + case _: LeafExecNode => true + case _ => false + } + + /** Find any node in the tree that has a LOGICAL_PLAN_TAG. */ + private def findLogicalPlanInTree(plan: SparkPlan): Option[LogicalPlan] = { + plan + .getTagValue(SparkPlan.LOGICAL_PLAN_TAG) + .orElse(plan.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)) + .orElse(plan.children.collectFirst { + case child if findLogicalPlanInTree(child).isDefined => findLogicalPlanInTree(child).get + }) + } + + private def getGlutenLogicalPlan(node: SparkPlan): LogicalPlan = { + node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).getOrElse { + node.getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG).getOrElse { + fail(node.getClass.getSimpleName + " does not have a logical plan link") + } + } + } + + private def assertLogicalPlanType[T <: LogicalPlan: ClassTag](node: SparkPlan): Unit = { + val logicalPlan = getGlutenLogicalPlan(node) + val expectedCls = implicitly[ClassTag[T]].runtimeClass + assert( + expectedCls == logicalPlan.getClass, + s"Expected ${expectedCls.getSimpleName} but got ${logicalPlan.getClass.getSimpleName}" + + s" for ${node.getClass.getSimpleName}" + ) + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala index 27865af762ec..d7df069d4447 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLExecutionSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenSQLExecutionSuite extends SQLExecutionSuite with GlutenTestsTrait {} +class GlutenSQLExecutionSuite extends SQLExecutionSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + // Inject GlutenPlugin so per-test SparkSessions created by the parent suite load it. + // SparkConf reads system properties prefixed with "spark." as defaults. + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala index 2ca7b4380d65..f868ebfd2998 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSQLJsonProtocolSuite.scala @@ -16,6 +16,25 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenSQLJsonProtocolSuite extends SQLJsonProtocolSuite with GlutenTestsTrait {} +class GlutenSQLJsonProtocolSuite extends SQLJsonProtocolSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala index 5a6595bd9eb3..f56e0cc3b8ef 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenShufflePartitionsUtilSuite.scala @@ -16,6 +16,27 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenShufflePartitionsUtilSuite extends ShufflePartitionsUtilSuite with GlutenTestsTrait {} +class GlutenShufflePartitionsUtilSuite + extends ShufflePartitionsUtilSuite + with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala index a3f0a577d782..3549004e56c9 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenSparkPlanSuite.scala @@ -16,6 +16,37 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.execution.{ColumnarToRowExecBase => GlutenC2R} + import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.internal.SQLConf + +class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait { -class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {} + testGluten( + "SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { + path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + // Gluten replaces ColumnarToRowExec with VeloxColumnarToRowExec + val c2r = df.queryExecution.executedPlan + .collectFirst { case p: GlutenC2R => p } + .orElse(df.queryExecution.executedPlan + .collectFirst { case p: ColumnarToRowExec => p }) + .get + try { + spark.range(1).foreach { + _ => + c2r.canonicalized + () + } + } catch { + case e: Throwable => + fail("ColumnarToRow was not canonicalizable", e) + } + } + } + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala index d81f8d58e7b3..2ec6ca1a9fc7 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenUnsafeRowSerializerSuite.scala @@ -16,6 +16,25 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.shim.GlutenTestsTrait +import org.apache.spark.sql.GlutenTestsCommonTrait -class GlutenUnsafeRowSerializerSuite extends UnsafeRowSerializerSuite with GlutenTestsTrait {} +class GlutenUnsafeRowSerializerSuite extends UnsafeRowSerializerSuite with GlutenTestsCommonTrait { + + override def beforeAll(): Unit = { + System.setProperty("spark.plugins", "org.apache.gluten.GlutenPlugin") + System.setProperty("spark.memory.offHeap.enabled", "true") + System.setProperty("spark.memory.offHeap.size", "1024MB") + System.setProperty( + "spark.shuffle.manager", + "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + System.clearProperty("spark.plugins") + System.clearProperty("spark.memory.offHeap.enabled") + System.clearProperty("spark.memory.offHeap.size") + System.clearProperty("spark.shuffle.manager") + } +}