Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_EXEC_COVERAGE_THRESHOLD: ConfigEntry[Double] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.coverageThreshold")
.category(CATEGORY_EXEC)
.doc(
"Minimum fraction of eligible operators that must be converted to Comet native " +
"operators for the native plan to be used. If the coverage is below this threshold, " +
"Comet falls back to the original Spark plan. A value of 0.0 disables this check. " +
"For example, 0.5 means at least 50% of eligible operators must be Comet-accelerated.")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Coverage threshold must be between 0.0 and 1.0")
.createWithDefault(0.0)

val COMET_EXEC_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("project", defaultValue = true)
val COMET_EXEC_FILTER_ENABLED: ConfigEntry[Boolean] =
Expand Down
16 changes: 16 additions & 0 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ to test with both for your specific workloads.

To configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`, set `spark.comet.exec.replaceSortMergeJoin=true`.

## Coverage Threshold

When Comet can only accelerate a small fraction of the operators in a query, the overhead from transitions between
Spark and Comet execution may outweigh the benefit of native execution. The `spark.comet.exec.coverageThreshold`
configuration allows you to set a minimum coverage percentage. If the fraction of operators that Comet can convert
falls below this threshold, Comet will fall back to the original Spark plan for the entire query.

For example, to require that at least 50% of eligible operators are converted before using Comet:

```
spark.comet.exec.coverageThreshold=0.5
```

The default value is `0.0`, which disables the check and preserves the existing behavior of always using Comet when
possible. When the threshold is triggered, a warning is logged with the coverage statistics.

## Shuffle

Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries.
Expand Down
58 changes: 42 additions & 16 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
outString: StringBuilder,
planStats: CometCoverageStats): Unit = {

node match {
case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec |
_: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec =>
// ignore
case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec |
_: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec =>
planStats.transitions += 1
case _: CometPlan =>
planStats.cometOperators += 1
case _ =>
planStats.sparkOperators += 1
}
planStats.classifyNode(node)

outString.append(" " * indent)
if (depth > 0) {
Expand Down Expand Up @@ -182,14 +171,51 @@ class CometCoverageStats {
var cometOperators: Int = 0
var transitions: Int = 0

def eligible: Int = sparkOperators + cometOperators

def coverageFraction: Double =
if (eligible == 0) 0.0 else cometOperators.toDouble / eligible

override def toString(): String = {
val eligible = sparkOperators + cometOperators
val converted =
if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0
s"Comet accelerated $cometOperators out of $eligible " +
s"eligible operators (${converted.toInt}%). " +
s"eligible operators (${(coverageFraction * 100).toInt}%). " +
s"Final plan contains $transitions transitions between Spark and Comet."
}

def classifyNode(node: TreeNode[_]): Unit = {
node match {
case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec |
_: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec =>
// ignore
case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec |
_: CometNativeColumnarToRowExec | _: CometSparkToColumnarExec =>
transitions += 1
case _: CometPlan =>
cometOperators += 1
case _ =>
sparkOperators += 1
}
}
}

object CometCoverageStats {
def fromPlan(plan: SparkPlan): CometCoverageStats = {
val stats = new CometCoverageStats()
collectStats(getActualPlan(plan), stats)
stats
}

private def collectStats(node: TreeNode[_], stats: CometCoverageStats): Unit = {
stats.classifyNode(node)
node.innerChildren.foreach {
case c: TreeNode[_] => collectStats(getActualPlan(c), stats)
case _ =>
}
node.children.foreach {
case c: TreeNode[_] => collectStats(getActualPlan(c), stats)
case _ =>
}
}
}

object CometExplainInfo {
Expand Down
20 changes: 18 additions & 2 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
Expand Down Expand Up @@ -444,7 +444,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

// Convert native execution block by linking consecutive native operators.
var firstNativeOp = true
newPlan.transformDown {
val finalPlan = newPlan.transformDown {
case op: CometNativeExec =>
val newPlan = if (firstNativeOp) {
firstNativeOp = false
Expand Down Expand Up @@ -475,6 +475,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
firstNativeOp = true
op
}

// Check coverage threshold - if the fraction of converted operators is too low,
// fall back to the original Spark plan.
val threshold = CometConf.COMET_EXEC_COVERAGE_THRESHOLD.get(conf)
if (threshold > 0.0) {
val stats = CometCoverageStats.fromPlan(finalPlan)
if (stats.eligible > 0 && stats.coverageFraction < threshold) {
logWarning(
s"Comet native coverage ${(stats.coverageFraction * 100).toInt}% " +
s"is below threshold ${(threshold * 100).toInt}% " +
s"($stats). Falling back to Spark plan.")
return plan
}
}

finalPlan
}
}

Expand Down
Loading