From fa0955b4376d9258824551b816e3a5d58ade0d22 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Mar 2026 15:04:51 -0600 Subject: [PATCH 1/3] feat: add coverage threshold to skip Comet for low-coverage queries Add spark.comet.exec.coverageThreshold config (0.0-1.0, default 0.0) that reverts to the original Spark plan when the fraction of converted operators falls below the threshold. This avoids the overhead of Spark-to-Comet transitions for queries where only a small percentage of operators can run natively. --- .../scala/org/apache/comet/CometConf.scala | 12 ++++++ .../apache/comet/ExtendedExplainInfo.scala | 41 +++++++++++++++++-- .../apache/comet/rules/CometExecRule.scala | 20 ++++++++- 3 files changed, 67 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bfe90181ff..a8d2a6762f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -262,6 +262,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_EXEC_COVERAGE_THRESHOLD: ConfigEntry[Double] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.coverageThreshold") + .category(CATEGORY_EXEC) + .doc( + "Minimum fraction of eligible operators that must be converted to Comet native " + + "operators for the native plan to be used. If the coverage is below this threshold, " + + "Comet falls back to the original Spark plan. A value of 0.0 disables this check. " + + "For example, 0.5 means at least 50% of eligible operators must be Comet-accelerated.") + .doubleConf + .checkValue(v => v >= 0.0 && v <= 1.0, "Coverage threshold must be between 0.0 and 1.0") + .createWithDefault(0.0) + val COMET_EXEC_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("project", defaultValue = true) val COMET_EXEC_FILTER_ENABLED: ConfigEntry[Boolean] = diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index f47428e801..74af087a8f 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -182,16 +182,49 @@ class CometCoverageStats { var cometOperators: Int = 0 var transitions: Int = 0 + def eligible: Int = sparkOperators + cometOperators + + def coveragePercent: Double = + if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 + override def toString(): String = { - val eligible = sparkOperators + cometOperators - val converted = - if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 s"Comet accelerated $cometOperators out of $eligible " + - s"eligible operators (${converted.toInt}%). " + + s"eligible operators (${coveragePercent.toInt}%). " + s"Final plan contains $transitions transitions between Spark and Comet." } } +object CometCoverageStats { + def fromPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + collectStats(getActualPlan(plan), stats) + stats + } + + private def collectStats(node: TreeNode[_], stats: CometCoverageStats): Unit = { + node match { + case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | + _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => + // ignore + case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | + _: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec => + stats.transitions += 1 + case _: CometPlan => + stats.cometOperators += 1 + case _ => + stats.sparkOperators += 1 + } + node.innerChildren.foreach { + case c: TreeNode[_] => collectStats(getActualPlan(c), stats) + case _ => + } + node.children.foreach { + case c: TreeNode[_] => collectStats(getActualPlan(c), stats) + case _ => + } + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..c2987fe962 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} +import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs @@ -444,7 +444,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // Convert native execution block by linking consecutive native operators. var firstNativeOp = true - newPlan.transformDown { + val finalPlan = newPlan.transformDown { case op: CometNativeExec => val newPlan = if (firstNativeOp) { firstNativeOp = false @@ -475,6 +475,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { firstNativeOp = true op } + + // Check coverage threshold - if the fraction of converted operators is too low, + // fall back to the original Spark plan. + val threshold = CometConf.COMET_EXEC_COVERAGE_THRESHOLD.get(conf) + if (threshold > 0.0) { + val stats = CometCoverageStats.fromPlan(finalPlan) + val coverage = stats.coveragePercent / 100.0 + if (stats.eligible > 0 && coverage < threshold) { + logWarning( + s"Comet native coverage ${(coverage * 100).toInt}% is below threshold " + + s"${(threshold * 100).toInt}% ($stats). Falling back to Spark plan.") + return plan + } + } + + finalPlan } } From 229697483f73a3b5da261776c9614de5857b3b11 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Mar 2026 15:06:30 -0600 Subject: [PATCH 2/3] docs: add coverage threshold section to tuning guide --- docs/source/user-guide/latest/tuning.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..fd78480f75 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -116,6 +116,22 @@ to test with both for your specific workloads. To configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`, set `spark.comet.exec.replaceSortMergeJoin=true`. +## Coverage Threshold + +When Comet can only accelerate a small fraction of the operators in a query, the overhead from transitions between +Spark and Comet execution may outweigh the benefit of native execution. The `spark.comet.exec.coverageThreshold` +configuration allows you to set a minimum coverage percentage. If the fraction of operators that Comet can convert +falls below this threshold, Comet will fall back to the original Spark plan for the entire query. + +For example, to require that at least 50% of eligible operators are converted before using Comet: + +``` +spark.comet.exec.coverageThreshold=0.5 +``` + +The default value is `0.0`, which disables the check and preserves the existing behavior of always using Comet when +possible. When the threshold is triggered, a warning is logged with the coverage statistics. + ## Shuffle Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries. From f1615e53121a6db7371df90d93638af3573d72d6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Mar 2026 15:09:34 -0600 Subject: [PATCH 3/3] refactor: deduplicate node classification and fix unit mismatch Extract classifyNode() to eliminate duplicated match logic between generateTreeString and collectStats. Replace coveragePercent with coverageFraction (0.0-1.0) to match the threshold config units and avoid unnecessary multiply/divide conversions. --- .../apache/comet/ExtendedExplainInfo.scala | 47 ++++++++----------- .../apache/comet/rules/CometExecRule.scala | 8 ++-- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 74af087a8f..6ad163bb4c 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -99,18 +99,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { outString: StringBuilder, planStats: CometCoverageStats): Unit = { - node match { - case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | - _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => - // ignore - case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | - _: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec => - planStats.transitions += 1 - case _: CometPlan => - planStats.cometOperators += 1 - case _ => - planStats.sparkOperators += 1 - } + planStats.classifyNode(node) outString.append(" " * indent) if (depth > 0) { @@ -184,36 +173,40 @@ class CometCoverageStats { def eligible: Int = sparkOperators + cometOperators - def coveragePercent: Double = - if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 + def coverageFraction: Double = + if (eligible == 0) 0.0 else cometOperators.toDouble / eligible override def toString(): String = { s"Comet accelerated $cometOperators out of $eligible " + - s"eligible operators (${coveragePercent.toInt}%). " + + s"eligible operators (${(coverageFraction * 100).toInt}%). " + s"Final plan contains $transitions transitions between Spark and Comet." } -} -object CometCoverageStats { - def fromPlan(plan: SparkPlan): CometCoverageStats = { - val stats = new CometCoverageStats() - collectStats(getActualPlan(plan), stats) - stats - } - - private def collectStats(node: TreeNode[_], stats: CometCoverageStats): Unit = { + def classifyNode(node: TreeNode[_]): Unit = { node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => // ignore case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | _: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec => - stats.transitions += 1 + transitions += 1 case _: CometPlan => - stats.cometOperators += 1 + cometOperators += 1 case _ => - stats.sparkOperators += 1 + sparkOperators += 1 } + } +} + +object CometCoverageStats { + def fromPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + collectStats(getActualPlan(plan), stats) + stats + } + + private def collectStats(node: TreeNode[_], stats: CometCoverageStats): Unit = { + stats.classifyNode(node) node.innerChildren.foreach { case c: TreeNode[_] => collectStats(getActualPlan(c), stats) case _ => diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c2987fe962..b09a8b8e5c 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -481,11 +481,11 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val threshold = CometConf.COMET_EXEC_COVERAGE_THRESHOLD.get(conf) if (threshold > 0.0) { val stats = CometCoverageStats.fromPlan(finalPlan) - val coverage = stats.coveragePercent / 100.0 - if (stats.eligible > 0 && coverage < threshold) { + if (stats.eligible > 0 && stats.coverageFraction < threshold) { logWarning( - s"Comet native coverage ${(coverage * 100).toInt}% is below threshold " + - s"${(threshold * 100).toInt}% ($stats). Falling back to Spark plan.") + s"Comet native coverage ${(stats.coverageFraction * 100).toInt}% " + + s"is below threshold ${(threshold * 100).toInt}% " + + s"($stats). Falling back to Spark plan.") return plan } }