diff --git a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala index b0dd2fdda..ee3c9a532 100644 --- a/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala +++ b/auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala @@ -23,3 +23,11 @@ import org.apache.spark.scheduler.SparkListenerEvent sealed trait AuronEvent extends SparkListenerEvent {} case class AuronBuildInfoEvent(info: mutable.LinkedHashMap[String, String]) extends AuronEvent {} + +case class AuronPlanFallbackEvent( + executionId: Long, + numAuronNodes: Int, + numFallbackNodes: Int, + physicalPlanDescription: String, + fallbackNodeToReason: Map[String, String]) + extends AuronEvent {} diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala index 840abca67..3c66246d4 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala @@ -16,10 +16,18 @@ */ package org.apache.spark.sql.execution.ui -import scala.xml.{Node, NodeSeq} +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.xml.{Node, NodeSeq, Unparsed} import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} +import org.apache.spark.util.Utils import org.apache.auron.sparkver @@ -29,12 +37,22 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage( @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") override def render(request: javax.servlet.http.HttpServletRequest): Seq[Node] = { - UIUtils.headerSparkPage(request, "Auron", buildInfoSummary(sqlStore.buildInfo()), parent) + UIUtils.headerSparkPage( + request, + "Auron", + buildInfoSummary(sqlStore.buildInfo()) ++ + buildExecutionsListSummary(sqlStore.executionsList(), request), + parent) } @sparkver("4.0 / 4.1") override def render(request: jakarta.servlet.http.HttpServletRequest): Seq[Node] = { - UIUtils.headerSparkPage(request, "Auron", buildInfoSummary(sqlStore.buildInfo()), parent) + UIUtils.headerSparkPage( + request, + "Auron", + buildInfoSummary(sqlStore.buildInfo()) ++ + buildExecutionsListSummary(sqlStore.executionsList(), request), + parent) } private def propertyHeader = Seq("Name", "Value") @@ -77,4 +95,376 @@ private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage( } } + @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") + private def buildExecutionsListSummary( + executionsList: Seq[AuronSQLExecutionUIData], + request: javax.servlet.http.HttpServletRequest): NodeSeq = { + val content = { + val _content = mutable.ListBuffer[Node]() + val executionPage = + Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1) + + val auronPageTable = + try { + new AuronExecutionPagedTable( + request, + parent, + executionsList, + "auron", + "auron", + UIUtils.prependBaseUri(request, parent.basePath), + "auron").table(executionPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering execution table:

+
+              {Utils.exceptionString(e)}
+            
+
+ } + + _content ++= + +

+ + + Queries: + {executionsList.size} +

+
++ +
+ {auronPageTable} +
+ + _content + } + content ++= + + } + + @sparkver("4.0 / 4.1") + private def buildExecutionsListSummary( + executionsList: Seq[AuronSQLExecutionUIData], + request: jakarta.servlet.http.HttpServletRequest): NodeSeq = { + val content = { + val _content = mutable.ListBuffer[Node]() + val executionPage = + Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1) + + val auronPageTable = + try { + new AuronExecutionPagedTable( + request, + parent, + executionsList, + "auron", + "auron", + UIUtils.prependBaseUri(request, parent.basePath), + "auron").table(executionPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering execution table:

+
+                {Utils.exceptionString(e)}
+              
+
+ } + + _content ++= + +

+ + + Queries: + {executionsList.size} +

+
++ +
+ {auronPageTable} +
+ + _content + } + content ++= + + } + +} + +private[ui] class AuronExecutionPagedTable( + request: HttpServletRequest, + parent: AuronSQLTab, + data: Seq[AuronSQLExecutionUIData], + tableHeaderId: String, + executionTag: String, + basePath: String, + subPath: String) + extends PagedTable[AuronExecutionTableRowData] { + + private val (sortColumn, desc, pageSize) = getAuronTableParameters(request, executionTag, "ID") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + override val dataSource = new AuronExecutionDataSource(data, pageSize, sortColumn, desc) + + private val parameterPath = + s"$basePath/$subPath/?${getAuronParameterOtherTable(request, executionTag)}" + + override def tableId: String = s"$executionTag-table" + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$executionTag.sort=$encodedSortColumn" + + s"&$executionTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableHeaderId" + } + + /** + * Returns parameters of other tables in the page. + */ + def getAuronParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { + request.getParameterMap.asScala + .filterNot(_._1.startsWith(tableTag)) + .map(parameter => parameter._1 + "=" + parameter._2(0)) + .mkString("&") + } + + /** + * Returns parameter of this table. + */ + def getAuronTableParameters( + request: HttpServletRequest, + tableTag: String, + defaultSortColumn: String): (String, Boolean, Int) = { + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterSortDesc = request.getParameter(s"$tableTag.desc") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + val sortColumn = Option(parameterSortColumn) + .map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + } + .getOrElse(defaultSortColumn) + val desc = + Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn == defaultSortColumn) + val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + (sortColumn, desc, pageSize) + } + + override def pageSizeFormField: String = s"$executionTag.pageSize" + + override def pageNumberFormField: String = s"$executionTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" + + // Information for each header: title, sortable, tooltip + private val headerInfo: Seq[(String, Boolean, Option[String])] = { + Seq( + ("ID", true, None), + ("Description", true, None), + ("Num Auron Nodes", true, None), + ("Num Fallback Nodes", true, None)) + } + + override def headers: Seq[Node] = { + isAuronSortColumnValid(headerInfo, sortColumn) + + headerAuronRow( + headerInfo, + desc, + pageSize, + sortColumn, + parameterPath, + executionTag, + tableHeaderId) + } + + def headerAuronRow( + headerInfo: Seq[(String, Boolean, Option[String])], + desc: Boolean, + pageSize: Int, + sortColumn: String, + parameterPath: String, + tableTag: String, + headerId: String): Seq[Node] = { + val row: Seq[Node] = { + headerInfo.map { case (header, sortable, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!desc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$headerId") + + + + + {header} + + + + } else { + + + {header} + + + } + } + } + } + + + {row} + + + } + + def isAuronSortColumnValid( + headerInfo: Seq[(String, Boolean, Option[String])], + sortColumn: String): Unit = { + if (!headerInfo.filter(_._2).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + } + + override def row(executionTableRow: AuronExecutionTableRowData): Seq[Node] = { + val executionUIData = executionTableRow.executionUIData + + + + {executionUIData.executionId.toString} + + + {descriptionCell(executionUIData)} + + + {executionUIData.numAuronNodes.toString} + + + {executionUIData.numFallbackNodes.toString} + + + } + + private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] = { + val details = if (execution.description != null && execution.description.nonEmpty) { + val concat = new PlanStringConcat() + concat.append("== Fallback Summary ==\n") + val fallbackSummary = execution.fallbackNodeToReason + .map { case (name, reason) => + val id = name.substring(0, 3) + val nodeName = name.substring(4) + s"(${id.toInt}) $nodeName: $reason" + } + .mkString("\n") + concat.append(fallbackSummary) + if (execution.fallbackNodeToReason.isEmpty) { + concat.append("No fallback nodes") + } + concat.append("\n\n") + concat.append(execution.fallbackDescription) + + + +details + ++ + + } else { + Nil + } + + val desc = if (execution.description != null && execution.description.nonEmpty) { + + {execution.description} + } else { + {execution.executionId} + } + +
{desc}{details}
+ } + + private def executionURL(executionID: Long): String = + s"${UIUtils.prependBaseUri(request, parent.basePath)}/SQL/execution/?id=$executionID" +} + +private[ui] class AuronExecutionTableRowData(val executionUIData: AuronSQLExecutionUIData) + +private[ui] class AuronExecutionDataSource( + executionData: Seq[AuronSQLExecutionUIData], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[AuronExecutionTableRowData](pageSize) { + + // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show + // in the table so that we can avoid creating duplicate contents during sorting the data + private val data = executionData.map(executionRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[AuronExecutionTableRowData] = + data.slice(from, to) + + private def executionRow( + executionUIData: AuronSQLExecutionUIData): AuronExecutionTableRowData = { + new AuronExecutionTableRowData(executionUIData) + } + + /** Return Ordering according to sortColumn and desc. */ + private def ordering( + sortColumn: String, + desc: Boolean): Ordering[AuronExecutionTableRowData] = { + val ordering: Ordering[AuronExecutionTableRowData] = sortColumn match { + case "ID" => Ordering.by(_.executionUIData.executionId) + case "Description" => Ordering.by(_.executionUIData.fallbackDescription) + case "Num Auron Nodes" => Ordering.by(_.executionUIData.numAuronNodes) + case "Num Fallback Nodes" => Ordering.by(_.executionUIData.numFallbackNodes) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } } diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala index 651234ac5..48612c1dc 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala @@ -16,20 +16,27 @@ */ package org.apache.spark.sql.execution.ui -import scala.annotation.nowarn +import scala.collection.mutable import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS +import org.apache.spark.status.{ElementTrackingStore, KVUtils} -import org.apache.auron.spark.ui.AuronBuildInfoEvent +import org.apache.auron.spark.ui.{AuronBuildInfoEvent, AuronPlanFallbackEvent} -@nowarn("cat=unused") // conf temporarily unused class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) extends SparkListener with Logging { + private val executionIdToDescription = new mutable.HashMap[Long, String] + private val executionIdToFallbackEvent = new mutable.HashMap[Long, AuronPlanFallbackEvent] + + kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get[Int](UI_RETAINED_EXECUTIONS)) { + count => cleanupExecutions(count) + } + def getAuronBuildInfo(): Long = { kvstore.count(classOf[AuronBuildInfoUIData]) } @@ -40,10 +47,61 @@ class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) } override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onSQLExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onSQLExtensionEnd(e) case e: AuronBuildInfoEvent => onAuronBuildInfo(e) + case e: AuronPlanFallbackEvent => onAuronPlanFallback(e) case _ => // Ignore } + private def onAuronPlanFallback(event: AuronPlanFallbackEvent): Unit = { + val description = executionIdToDescription.get(event.executionId) + if (description.isDefined) { + val uiData = new AuronSQLExecutionUIData( + event.executionId, + description.get, + event.numAuronNodes, + event.numFallbackNodes, + event.physicalPlanDescription, + event.fallbackNodeToReason.toSeq.sortBy(_._1)) + kvstore.write(uiData) + } else { + executionIdToFallbackEvent.put(event.executionId, event.copy()) + } + } + + private def onSQLExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + val fallbackEvent = executionIdToFallbackEvent.get(event.executionId) + if (fallbackEvent.isDefined) { + val uiData = new AuronSQLExecutionUIData( + fallbackEvent.get.executionId, + event.description, + fallbackEvent.get.numAuronNodes, + fallbackEvent.get.numFallbackNodes, + fallbackEvent.get.physicalPlanDescription, + fallbackEvent.get.fallbackNodeToReason.toSeq.sortBy(_._1)) + kvstore.write(uiData) + executionIdToFallbackEvent.remove(event.executionId) + } + executionIdToDescription.put(event.executionId, event.description) + } + + private def onSQLExtensionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + executionIdToDescription.remove(event.executionId) + executionIdToFallbackEvent.remove(event.executionId) + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + + val view = kvstore.view(classOf[AuronSQLExecutionUIData]).first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true) + toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId)) + } + } object AuronSQLAppStatusListener { def register(sc: SparkContext): Unit = { diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala index 0c44644ff..4bd220e78 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.sql.execution.ui +import scala.jdk.CollectionConverters.asScalaIteratorConverter import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.internal.Logging -import org.apache.spark.util.kvstore.{KVIndex, KVStore} +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.Utils +import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView} class AuronSQLAppStatusStore(store: KVStore) extends Logging { @@ -36,8 +39,41 @@ class AuronSQLAppStatusStore(store: KVStore) extends Logging { None } } + + private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = { + Utils.tryWithResource(view.closeableIterator())(iter => iter.asScala.toList) + } + + def executionsList(): Seq[AuronSQLExecutionUIData] = { + viewToSeq(store.view(classOf[AuronSQLExecutionUIData])) + } + + def executionsList(offset: Int, length: Int): Seq[AuronSQLExecutionUIData] = { + viewToSeq(store.view(classOf[AuronSQLExecutionUIData]).skip(offset).max(length)) + } + + def execution(executionId: Long): Option[AuronSQLExecutionUIData] = { + try { + Some(store.read(classOf[AuronSQLExecutionUIData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def executionsCount(): Long = { + store.count(classOf[AuronSQLExecutionUIData]) + } } +@KVIndex("executionId") +class AuronSQLExecutionUIData( + @KVIndexParam val executionId: Long, + val description: String, + val numAuronNodes: Int, + val numFallbackNodes: Int, + val fallbackDescription: String, + val fallbackNodeToReason: Seq[(String, String)]) {} + class AuronBuildInfoUIData(val info: Seq[(String, String)]) { @JsonIgnore @KVIndex diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala index 864879f12..77b20140f 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import java.io.File import org.apache.spark.SparkConf -import org.apache.spark.sql.AuronQueryTest +import org.apache.spark.sql.{AuronQueryTest, Row} import org.apache.spark.sql.execution.ui.AuronSQLAppStatusListener import org.apache.spark.util.Utils @@ -49,4 +49,13 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite { assert(listener.getAuronBuildInfo() == 1) } + test("test convert table in spark UI ") { + withTable("t1") { + sql( + "create table t1 using parquet PARTITIONED BY (part) as select 1 as c1, 2 as c2, 'test test' as part") + val df = sql("select * from t1") + checkAnswer(df, Seq(Row(1, 2, "test test"))) + } + } + } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala new file mode 100644 index 000000000..0d4340794 --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron + +import java.util.Collections.newSetFromMap + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.auron.AuronConvertStrategy.neverConvertReasonTag +import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.{BaseSubqueryExec, InputAdapter, ReusedSubqueryExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.ExplainUtils.getOpId +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} + +import org.apache.auron.sparkver + +object AuronExplainUtils { + private def generateOperatorIDs( + plan: QueryPlan[_], + startOperatorID: Int, + visited: java.util.Set[QueryPlan[_]], + reusedExchanges: ArrayBuffer[ReusedExchangeExec], + addReusedExchanges: Boolean): Int = { + var currentOperationID = startOperatorID + if (plan.isInstanceOf[BaseSubqueryExec]) { + return currentOperationID + } + + def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) { + plan match { + case r: ReusedExchangeExec if addReusedExchanges => + reusedExchanges.append(r) + case _ => + } + visited.add(plan) + currentOperationID += 1 + plan.setTagValue(TreeNodeTag[Int]("operatorId"), currentOperationID) + } + + plan.foreachUp { + case _: WholeStageCodegenExec => + case _: InputAdapter => + case p: AdaptiveSparkPlanExec => + currentOperationID = generateOperatorIDs( + p.executedPlan, + currentOperationID, + visited, + reusedExchanges, + addReusedExchanges) + setOpId(p) + case p: QueryStageExec => + currentOperationID = generateOperatorIDs( + p.plan, + currentOperationID, + visited, + reusedExchanges, + addReusedExchanges) + setOpId(p) + case other: QueryPlan[_] => + setOpId(other) + currentOperationID = other.innerChildren.foldLeft(currentOperationID) { (curId, plan) => + generateOperatorIDs(plan, curId, visited, reusedExchanges, addReusedExchanges) + } + } + currentOperationID + } + + private def getSubqueries( + plan: => QueryPlan[_], + subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { + plan.foreach { + case a: AdaptiveSparkPlanExec => + getSubqueries(a.executedPlan, subqueries) + case q: QueryStageExec => + getSubqueries(q.plan, subqueries) + case p: SparkPlan => + p.expressions.foreach(_.collect { case e: PlanExpression[_] => + e.plan match { + case s: BaseSubqueryExec => + subqueries += ((p, e, s)) + getSubqueries(s, subqueries) + case _ => + } + }) + } + } + + private def processPlanSkippingSubqueries[T <: QueryPlan[T]]( + plan: T, + append: String => Unit): Unit = { + try { + + QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true) + + append("\n") + } catch { + case e: AnalysisException => append(e.toString) + } + } + + private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String, String]) = { + var numAuronNodes = 0 + val fallbackNodeToReason = new mutable.HashMap[String, String] + + def collect(tmp: QueryPlan[_]): Unit = { + tmp.foreachUp { + case p: ExecutedCommandExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + case p: AdaptiveSparkPlanExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + collect(p.executedPlan) + case p: QueryStageExec => + handleVanillaSparkPlan(p, fallbackNodeToReason) + collect(p.plan) + case p: NativeSupports => + numAuronNodes += 1 + p.innerChildren.foreach(collect) + case p: SparkPlan => + handleVanillaSparkPlan(p, fallbackNodeToReason) + p.innerChildren.foreach(collect) + case _ => + } + } + + collect(plan) + (numAuronNodes, fallbackNodeToReason.toMap) + } + + def handleVanillaSparkPlan( + p: SparkPlan, + fallbackNodeToReason: mutable.HashMap[String, String]): Unit = { + if (p.getTagValue(neverConvertReasonTag).isDefined) { + addFallbackNodeWithReason(p, p.getTagValue(neverConvertReasonTag).get, fallbackNodeToReason) + } + } + + def addFallbackNodeWithReason( + p: SparkPlan, + reason: String, + fallbackNodeToReason: mutable.HashMap[String, String]): Unit = { + p.getTagValue(TreeNodeTag[Int]("operatorId")).foreach { opId => + // e.g., 002 project, it is used to help analysis by `substring(4)` + val formattedNodeName = f"$opId%03d ${p.nodeName}" + fallbackNodeToReason.put(formattedNodeName, reason) + } + } + + def processPlan[T <: QueryPlan[T]]( + plan: T, + append: String => Unit, + collectFallbackFunc: Option[QueryPlan[_] => (Int, Map[String, String])] = None) + : (Int, Map[String, String]) = synchronized { + try { + val operators = newSetFromMap[QueryPlan[_]](new java.util.IdentityHashMap()) + val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] + + var currentOperatorID = 0 + currentOperatorID = + generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, true) + + val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] + getSubqueries(plan, subqueries) + + currentOperatorID = subqueries.foldLeft(currentOperatorID) { (curId, plan) => + generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, true) + } + + val optimizedOutExchanges = ArrayBuffer.empty[Exchange] + reusedExchanges.foreach { reused => + val child = reused.child + if (!operators.contains(child)) { + optimizedOutExchanges.append(child) + currentOperatorID = + generateOperatorIDs(child, currentOperatorID, operators, reusedExchanges, false) + } + } + + processPlanSkippingSubqueries(plan, append) + + var i = 0 + for (sub <- subqueries) { + if (i == 0) { + append("\n===== Subqueries =====\n\n") + } + i = i + 1 + append( + s"Subquery:$i Hosting operator id = " + + s"${getOpId(sub._1)} Hosting Expression = ${sub._2}\n") + + if (!sub._3.isInstanceOf[ReusedSubqueryExec]) { + processPlanSkippingSubqueries(sub._3.child, append) + } + append("\n") + } + + i = 0 + optimizedOutExchanges.foreach { exchange => + if (i == 0) { + append("\n===== Adaptively Optimized Out Exchanges =====\n\n") + } + i = i + 1 + append(s"Subplan:$i\n") + processPlanSkippingSubqueries[SparkPlan](exchange, append) + append("\n") + } + + (subqueries.filter(!_._3.isInstanceOf[ReusedSubqueryExec]).map(_._3.child) :+ plan) + .map { plan => + if (collectFallbackFunc.isEmpty) { + collectFallbackNodes(plan) + } else { + collectFallbackFunc.get.apply(plan) + } + } + .reduce((a, b) => (a._1 + b._1, a._2 ++ b._2)) + } finally { + removeTags(plan) + } + } + + @sparkver("3.1/ 3.2 / 3.3/ 3.4/ 3.5") + private def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(TreeNodeTag[Int]("operatorId")) + children.foreach(removeTags) + } + + plan.foreach { + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) + } + } + + @sparkver("3.0") + private def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(TreeNodeTag[Int]("operatorId")) + children.foreach(removeTags) + } + + plan.foreach { + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) + } + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index b68b04954..50022e0ae 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -21,12 +21,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.ColumnarRule -import org.apache.spark.sql.execution.LocalTableScanExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat +import org.apache.spark.sql.execution.{ColumnarRule, LocalTableScanExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.ui.AuronEventUtils import org.apache.spark.sql.internal.SQLConf import org.apache.auron.spark.configuration.SparkAuronConfiguration +import org.apache.auron.spark.ui.AuronPlanFallbackEvent class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with Logging { Shims.get.initExtension() @@ -99,4 +100,36 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu } } } + + override def postColumnarTransitions: Rule[SparkPlan] = { + new Rule[SparkPlan] { + override def apply(sparkPlan: SparkPlan): SparkPlan = { + if (SparkEnv.get.conf + .get(SparkAuronConfiguration.UI_ENABLED.key, "true") + .equals("true")) { + val sc = sparkSession.sparkContext + val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionId == null) { + logDebug(s"Unknown execution id for plan: $sparkPlan") + return sparkPlan + } + val concat = new PlanStringConcat() + concat.append("== Physical Plan ==\n") + + val (numAuronNodes, fallbackNodeToReason) = + AuronExplainUtils.processPlan(sparkPlan, concat.append) + + val event = AuronPlanFallbackEvent( + executionId.toLong, + numAuronNodes, + fallbackNodeToReason.size, + concat.toString(), + fallbackNodeToReason) + AuronEventUtils.post(sc, event) + } + sparkPlan + } + } + + } }