From afbe35cf5b272991b4986e551b42d9201c3862c3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Wed, 8 Jun 2016 22:47:29 -0700 Subject: [PATCH] [SPARK-14670] [SQL] allow updating driver side sql metrics ## What changes were proposed in this pull request? On the SparkUI right now we have this SQLTab that displays accumulator values per operator. However, it only displays metrics updated on the executors, not on the driver. It is useful to also include driver metrics, e.g. broadcast time. This is a different version from https://github.com/apache/spark/pull/12427. This PR sends driver side accumulator updates right after the updating happens, not at the end of execution, by a new event. ## How was this patch tested? new test in `SQLListenerSuite`  Author: Wenchen Fan <wenchen@databricks.com> Closes #13189 from cloud-fan/metrics. --- .../exchange/BroadcastExchangeExec.scala | 9 +++ .../spark/sql/execution/ui/SQLListener.scala | 28 ++++++++-- .../sql/execution/ui/SQLListenerSuite.scala | 56 ++++++++++++++++++- 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index d3081ba7ac..bd0841db7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates import org.apache.spark.util.ThreadUtils /** @@ -92,6 +93,14 @@ case class BroadcastExchangeExec( val broadcasted = sparkContext.broadcast(relation) longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 + + // There are some cases we don't care about the metrics and call `SparkPlan.doExecute` + // directly without setting an execution id. We should be tolerant to it. + if (executionId != null) { + sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates( + executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq)) + } + broadcasted } }(BroadcastExchangeExec.executionContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 03b532664a..6e94791901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -42,6 +42,10 @@ case class SparkListenerSQLExecutionStart( case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)]) + extends SparkListenerEvent + private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { @@ -251,6 +255,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } } } + case SparkListenerDriverAccumUpdates(executionId, accumUpdates) => synchronized { + _executionIdToData.get(executionId).foreach { executionUIData => + for ((accId, accValue) <- accumUpdates) { + executionUIData.driverAccumUpdates(accId) = accValue + } + } + } case _ => // Ignore } @@ -296,7 +307,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi (accumulatorUpdate._1, accumulatorUpdate._2) } }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } - mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => + + val driverUpdates = executionUIData.driverAccumUpdates.toSeq + mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId => executionUIData.accumulatorMetrics(accumulatorId).metricType) case None => // This execution has been dropped @@ -368,10 +381,15 @@ private[ui] class SQLExecutionUIData( val physicalPlanDescription: String, val physicalPlanGraph: SparkPlanGraph, val accumulatorMetrics: Map[Long, SQLPlanMetric], - val submissionTime: Long, - var completionTime: Option[Long] = None, - val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty, - val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) { + val submissionTime: Long) { + + var completionTime: Option[Long] = None + + val jobs: mutable.HashMap[Long, JobExecutionStatus] = mutable.HashMap.empty + + val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer() + + val driverAccumUpdates: mutable.HashMap[Long, Long] = mutable.HashMap.empty /** * Return whether there are running jobs in this execution. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 6788c9d65f..6e60b0e4fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -23,11 +23,15 @@ import org.mockito.Mockito.mock import org.apache.spark._ import org.apache.spark.executor.TaskMetrics +import org.apache.spark.rdd.RDD import org.apache.spark.scheduler._ -import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.ui.SparkUI import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator} @@ -386,6 +390,52 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(trackedAccums.head === (sqlMetricInfo.id, sqlMetricInfo.update.get)) } + test("driver side SQL metrics") { + val listener = new SQLListener(spark.sparkContext.conf) + val expectedAccumValue = 12345 + val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) + sqlContext.sparkContext.addSparkListener(listener) + val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan = physicalPlan + override lazy val executedPlan = physicalPlan + } + SQLExecution.withNewExecutionId(spark, dummyQueryExecution) { + physicalPlan.execute().collect() + } + + def waitTillExecutionFinished(): Unit = { + while (listener.getCompletedExecutions.isEmpty) { + Thread.sleep(100) + } + } + waitTillExecutionFinished() + + val driverUpdates = listener.getCompletedExecutions.head.driverAccumUpdates + assert(driverUpdates.size == 1) + assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue) + } + +} + + +/** + * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a [[SQLMetrics]] + * on the driver. + */ +private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExecNode { + override def sparkContext: SparkContext = sc + override def output: Seq[Attribute] = Seq() + + override val metrics: Map[String, SQLMetric] = Map( + "dummy" -> SQLMetrics.createMetric(sc, "dummy")) + + override def doExecute(): RDD[InternalRow] = { + longMetric("dummy") += expectedValue + sc.listenerBus.post(SparkListenerDriverAccumUpdates( + sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong, + metrics.values.map(m => m.id -> m.value).toSeq)) + sc.emptyRDD + } } -- GitLab