Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,21 @@ case class CometScanExec(

private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has been
* initialized. See SPARK-26327 for more details.
*/
private def sendDriverMetrics(): Unit = {
driverMetrics.foreach(e => metrics(e._1).add(e._2))
@transient private lazy val setDriverMetrics: Unit = {
driverMetrics.foreach(e => metrics(e._1).set(e._2))
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
}

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has been
* initialized. See SPARK-26327 for more details.
*/
private def sendDriverMetrics(): Unit = setDriverMetrics

private def isDynamicPruningFilter(e: Expression): Boolean =
e.find(_.isInstanceOf[PlanExpression[_]]).isDefined

Expand Down
32 changes: 32 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,38 @@ class CometExecSuite extends CometTestBase {
}
}

test("Native_datafusion reports correct files and bytes scanned") {
withTempDir { dir =>
val path = new java.io.File(dir, "test_metrics").getAbsolutePath
spark.range(100).repartition(2).write.mode("overwrite").parquet(path)

withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include --conf spark.comet.scan.impl=native_datafusion

CometConf.COMET_EXEC_ENABLED.key -> "true") {
val df = spark.read.parquet(path)

// Trigger two different actions to ensure metrics are not duplicated
df.count()
df.collect()

val scanNode = stripAQEPlan(df.queryExecution.executedPlan)
.collectFirst {
case n: org.apache.spark.sql.comet.CometNativeScanExec => n
case n: org.apache.spark.sql.comet.CometScanExec => n
}
.getOrElse {
fail(
s"Comet scan node not found in the physical plan. Plan: \n${df.queryExecution.executedPlan}")
}

val numFiles = scanNode.metrics("numFiles").value
assert(
numFiles == 2,
s"Expected exactly 2 files to be scanned, but got metrics reporting $numFiles")
}
}
}

}

case class BucketedTableTestSpec(
Expand Down
Loading