diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1bd291389241aae460e46357813f719d2a41fa91..cd937257d31a8a2eb83e4c1fdf3ca9b3fcdc30cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -65,12 +65,15 @@ import org.apache.spark.util.Utils class SQLContext private[sql]( @transient val sparkContext: SparkContext, @transient protected[sql] val cacheManager: CacheManager, + @transient private[sql] val listener: SQLListener, val isRootContext: Boolean) extends org.apache.spark.Logging with Serializable { self => - def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true) + def this(sparkContext: SparkContext) = { + this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) + } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user @@ -97,7 +100,7 @@ class SQLContext private[sql]( /** * Returns a SQLContext as new session, with separated SQL configurations, temporary tables, - * registered functions, but sharing the same SparkContext and CacheManager. + * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab. * * @since 1.6.0 */ @@ -105,6 +108,7 @@ class SQLContext private[sql]( new SQLContext( sparkContext = sparkContext, cacheManager = cacheManager, + listener = listener, isRootContext = false) } @@ -113,11 +117,6 @@ class SQLContext private[sql]( */ protected[sql] lazy val conf = new SQLConf - // `listener` should be only used in the driver - @transient private[sql] val listener = new SQLListener(this) - sparkContext.addSparkListener(listener) - sparkContext.ui.foreach(new SQLTab(this, _)) - /** * Set Spark SQL configuration properties. * @@ -1312,4 +1311,14 @@ object SQLContext { ): InternalRow } } + + /** + * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI. + */ + private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = { + val listener = new SQLListener(sc.conf) + sc.addSparkListener(listener) + sc.ui.foreach(new SQLTab(listener, _)) + listener + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 5779c71f64e9e5c7d0c9288ae937ff49b0208682..d6472400a6a21ff8953b49fc02ed276512d949ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -19,19 +19,15 @@ package org.apache.spark.sql.execution.ui import scala.collection.mutable -import com.google.common.annotations.VisibleForTesting - -import org.apache.spark.{JobExecutionStatus, Logging} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} +import org.apache.spark.{JobExecutionStatus, Logging, SparkConf} -private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging { +private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging { - private val retainedExecutions = - sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 1000) + private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index 0b0867f67eb6e9fadc41b99d89a42e894c724dd4..9c27944d42fc6a72fd7d1df7095ddaf156f86d17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -20,14 +20,12 @@ package org.apache.spark.sql.execution.ui import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext import org.apache.spark.ui.{SparkUI, SparkUITab} -private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI) +private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { val parent = sparkUI - val listener = sqlContext.listener attachPage(new AllExecutionsPage(this)) attachPage(new ExecutionPage(this)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 7a46c69a056b1b424b6269f0818114b472bfbbca..727cf3665a871e9af18997b348af0be24d5004e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -74,7 +74,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("basic") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame val accumulatorIds = @@ -212,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( @@ -241,7 +241,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( @@ -281,7 +281,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("onExecutionEnd happens before onJobEnd(JobFailed)") { - val listener = new SQLListener(sqlContext) + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame listener.onExecutionStart( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ddeadd3eb737ddfa3658ba88915ae4ce9cc0107b..e620d7fb82af902a76f9c30a1c4e6bd48abc016f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,12 +40,13 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.SQLConf.SQLConfEntry._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck} +import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} @@ -88,12 +89,16 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, + @transient listener: SQLListener, @transient execHive: ClientWrapper, @transient metaHive: ClientInterface, - isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging { + isRootContext: Boolean) + extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { self => - def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true) + def this(sc: SparkContext) = { + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true) + } def this(sc: JavaSparkContext) = this(sc.sc) import org.apache.spark.sql.hive.HiveContext._ @@ -109,6 +114,7 @@ class HiveContext private[hive]( new HiveContext( sc = sc, cacheManager = cacheManager, + listener = listener, execHive = executionHive.newSession(), metaHive = metadataHive.newSession(), isRootContext = false)