From d0cc79ccd0b4500bd6b18184a723dabc164e8abd Mon Sep 17 00:00:00 2001 From: Davies Liu <davies@databricks.com> Date: Tue, 13 Oct 2015 09:57:53 -0700 Subject: [PATCH] [SPARK-11030] [SQL] share the SQLTab across sessions The SQLTab will be shared by multiple sessions. If we create multiple independent SQLContexts (not using newSession()), will still see multiple SQLTabs in the Spark UI. Author: Davies Liu <davies@databricks.com> Closes #9048 from davies/sqlui. --- .../org/apache/spark/sql/SQLContext.scala | 23 +++++++++++++------ .../spark/sql/execution/ui/SQLListener.scala | 10 +++----- .../spark/sql/execution/ui/SQLTab.scala | 4 +--- .../sql/execution/ui/SQLListenerSuite.scala | 8 +++---- .../apache/spark/sql/hive/HiveContext.scala | 12 +++++++--- 5 files changed, 33 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1bd2913892..cd937257d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -65,12 +65,15 @@ import org.apache.spark.util.Utils class SQLContext private[sql]( @transient val sparkContext: SparkContext, @transient protected[sql] val cacheManager: CacheManager, + @transient private[sql] val listener: SQLListener, val isRootContext: Boolean) extends org.apache.spark.Logging with Serializable { self => - def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true) + def this(sparkContext: SparkContext) = { + this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) + } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user @@ -97,7 +100,7 @@ class SQLContext private[sql]( /** * Returns a SQLContext as new session, with separated SQL configurations, temporary tables, - * registered functions, but sharing the same SparkContext and CacheManager. + * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab. * * @since 1.6.0 */ @@ -105,6 +108,7 @@ class SQLContext private[sql]( new SQLContext( sparkContext = sparkContext, cacheManager = cacheManager, + listener = listener, isRootContext = false) } @@ -113,11 +117,6 @@ class SQLContext private[sql]( */ protected[sql] lazy val conf = new SQLConf - // `listener` should be only used in the driver - @transient private[sql] val listener = new SQLListener(this) - sparkContext.addSparkListener(listener) - sparkContext.ui.foreach(new SQLTab(this, _)) - /** * Set Spark SQL configuration properties. * @@ -1312,4 +1311,14 @@ object SQLContext { ): InternalRow } } + + /** + * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI. + */ + private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = { + val listener = new SQLListener(sc.conf) + sc.addSparkListener(listener) + sc.ui.foreach(new SQLTab(listener, _)) + listener + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5779c71f64..d6472400a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -19,19 +19,15 @@ package org.apache.spark.sql.execution.ui import scala.collection.mutable -import com.google.common.annotations.VisibleForTesting - -import org.apache.spark.{JobExecutionStatus, Logging} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} +import org.apache.spark.{JobExecutionStatus, Logging, SparkConf} -private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging { +private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging { - private val retainedExecutions = - sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 1000) + private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index 0b0867f67e..9c27944d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -20,14 +20,12 @@ package org.apache.spark.sql.execution.ui import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext import org.apache.spark.ui.{SparkUI, SparkUITab} -private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI) +private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { val parent = sparkUI - val listener = sqlContext.listener attachPage(new AllExecutionsPage(this)) attachPage(new ExecutionPage(this)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 7a46c69a05..727cf3665a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -74,7 +74,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("basic") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame val accumulatorIds = @@ -212,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( @@ -241,7 +241,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( @@ -281,7 +281,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before onJobEnd(JobFailed)") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ddeadd3eb7..e620d7fb82 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,12 +40,13 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.SQLConf.SQLConfEntry._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck} +import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} @@ -88,12 +89,16 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, + @transient listener: SQLListener, @transient execHive: ClientWrapper, @transient metaHive: ClientInterface, - isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging { + isRootContext: Boolean) + extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { self => - def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true) + def this(sc: SparkContext) = { + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true) + } def this(sc: JavaSparkContext) = this(sc.sc) import org.apache.spark.sql.hive.HiveContext._ @@ -109,6 +114,7 @@ class HiveContext private[hive]( new HiveContext( sc = sc, cacheManager = cacheManager, + listener = listener, execHive = executionHive.newSession(), metaHive = metadataHive.newSession(), isRootContext = false) -- GitLab