diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bfe90181ff..a8d2a6762f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -262,6 +262,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_EXEC_COVERAGE_THRESHOLD: ConfigEntry[Double] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.coverageThreshold") + .category(CATEGORY_EXEC) + .doc( + "Minimum fraction of eligible operators that must be converted to Comet native " + + "operators for the native plan to be used. If the coverage is below this threshold, " + + "Comet falls back to the original Spark plan. A value of 0.0 disables this check. " + + "For example, 0.5 means at least 50% of eligible operators must be Comet-accelerated.") + .doubleConf + .checkValue(v => v >= 0.0 && v <= 1.0, "Coverage threshold must be between 0.0 and 1.0") + .createWithDefault(0.0) + val COMET_EXEC_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("project", defaultValue = true) val COMET_EXEC_FILTER_ENABLED: ConfigEntry[Boolean] = diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..fd78480f75 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -116,6 +116,22 @@ to test with both for your specific workloads. To configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`, set `spark.comet.exec.replaceSortMergeJoin=true`. +## Coverage Threshold + +When Comet can only accelerate a small fraction of the operators in a query, the overhead from transitions between +Spark and Comet execution may outweigh the benefit of native execution. The `spark.comet.exec.coverageThreshold` +configuration allows you to set a minimum coverage percentage. If the fraction of operators that Comet can convert +falls below this threshold, Comet will fall back to the original Spark plan for the entire query. + +For example, to require that at least 50% of eligible operators are converted before using Comet: + +``` +spark.comet.exec.coverageThreshold=0.5 +``` + +The default value is `0.0`, which disables the check and preserves the existing behavior of always using Comet when +possible. When the threshold is triggered, a warning is logged with the coverage statistics. + ## Shuffle Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries. diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index f47428e801..6ad163bb4c 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -99,18 +99,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { outString: StringBuilder, planStats: CometCoverageStats): Unit = { - node match { - case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | - _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => - // ignore - case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | - _: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec => - planStats.transitions += 1 - case _: CometPlan => - planStats.cometOperators += 1 - case _ => - planStats.sparkOperators += 1 - } + planStats.classifyNode(node) outString.append(" " * indent) if (depth > 0) { @@ -182,14 +171,51 @@ class CometCoverageStats { var cometOperators: Int = 0 var transitions: Int = 0 + def eligible: Int = sparkOperators + cometOperators + + def coverageFraction: Double = + if (eligible == 0) 0.0 else cometOperators.toDouble / eligible + override def toString(): String = { - val eligible = sparkOperators + cometOperators - val converted = - if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 s"Comet accelerated $cometOperators out of $eligible " + - s"eligible operators (${converted.toInt}%). " + + s"eligible operators (${(coverageFraction * 100).toInt}%). " + s"Final plan contains $transitions transitions between Spark and Comet." } + + def classifyNode(node: TreeNode[_]): Unit = { + node match { + case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | + _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => + // ignore + case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | + _: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec => + transitions += 1 + case _: CometPlan => + cometOperators += 1 + case _ => + sparkOperators += 1 + } + } +} + +object CometCoverageStats { + def fromPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + collectStats(getActualPlan(plan), stats) + stats + } + + private def collectStats(node: TreeNode[_], stats: CometCoverageStats): Unit = { + stats.classifyNode(node) + node.innerChildren.foreach { + case c: TreeNode[_] => collectStats(getActualPlan(c), stats) + case _ => + } + node.children.foreach { + case c: TreeNode[_] => collectStats(getActualPlan(c), stats) + case _ => + } + } } object CometExplainInfo { diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..b09a8b8e5c 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} +import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs @@ -444,7 +444,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // Convert native execution block by linking consecutive native operators. var firstNativeOp = true - newPlan.transformDown { + val finalPlan = newPlan.transformDown { case op: CometNativeExec => val newPlan = if (firstNativeOp) { firstNativeOp = false @@ -475,6 +475,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { firstNativeOp = true op } + + // Check coverage threshold - if the fraction of converted operators is too low, + // fall back to the original Spark plan. + val threshold = CometConf.COMET_EXEC_COVERAGE_THRESHOLD.get(conf) + if (threshold > 0.0) { + val stats = CometCoverageStats.fromPlan(finalPlan) + if (stats.eligible > 0 && stats.coverageFraction < threshold) { + logWarning( + s"Comet native coverage ${(stats.coverageFraction * 100).toInt}% " + + s"is below threshold ${(threshold * 100).toInt}% " + + s"($stats). Falling back to Spark plan.") + return plan + } + } + + finalPlan } }