Skip to content
Snippets Groups Projects
Commit 3cc2c053 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-11540][SQL] API audit for QueryExecutionListener.

Author: Reynold Xin <rxin@databricks.com>

Closes #9509 from rxin/SPARK-11540.
parent 5e31db70
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution package org.apache.spark.sql.execution
import com.google.common.annotations.VisibleForTesting
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
...@@ -25,31 +27,33 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ...@@ -25,31 +27,33 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/** /**
* The primary workflow for executing relational queries using Spark. Designed to allow easy * The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers. * access to the intermediate phases of query execution for developers.
*
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.
*/ */
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
val analyzer = sqlContext.analyzer
val optimizer = sqlContext.optimizer
val planner = sqlContext.planner
val cacheManager = sqlContext.cacheManager
val prepareForExecution = sqlContext.prepareForExecution
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) @VisibleForTesting
def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)
lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = { lazy val withCachedData: LogicalPlan = {
assertAnalyzed() assertAnalyzed()
cacheManager.useCachedData(analyzed) sqlContext.cacheManager.useCachedData(analyzed)
} }
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
// TODO: Don't just pick the first one... // TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = { lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(sqlContext) SparkPlan.currentContext.set(sqlContext)
planner.plan(optimizedPlan).next() sqlContext.planner.plan(optimizedPlan).next()
} }
// executedPlan should not be used to initialize any SparkPlan. It should be // executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution. // only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */ /** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute() lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
...@@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { ...@@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
protected def stringOrError[A](f: => A): String = protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString } try f.toString catch { case e: Throwable => e.toString }
def simpleString: String = def simpleString: String = {
s"""== Physical Plan == s"""== Physical Plan ==
|${stringOrError(executedPlan)} |${stringOrError(executedPlan)}
""".stripMargin.trim """.stripMargin.trim
}
override def toString: String = { override def toString: String = {
def output = def output =
......
...@@ -19,36 +19,38 @@ package org.apache.spark.sql.util ...@@ -19,36 +19,38 @@ package org.apache.spark.sql.util
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.QueryExecution
/** /**
* :: Experimental ::
* The interface of query execution listener that can be used to analyze execution metrics. * The interface of query execution listener that can be used to analyze execution metrics.
* *
* Note that implementations should guarantee thread-safety as they will be used in a non * Note that implementations should guarantee thread-safety as they can be invoked by
* thread-safe way. * multiple different threads.
*/ */
@Experimental @Experimental
trait QueryExecutionListener { trait QueryExecutionListener {
/** /**
* A callback function that will be called when a query executed successfully. * A callback function that will be called when a query executed successfully.
* Implementations should guarantee thread-safe. * Note that this can be invoked by multiple different threads.
* *
* @param funcName the name of the action that triggered this query. * @param funcName name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan, * @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc. * physical plan, etc.
* @param duration the execution time for this query in nanoseconds. * @param durationNs the execution time for this query in nanoseconds.
*/ */
@DeveloperApi @DeveloperApi
def onSuccess(funcName: String, qe: QueryExecution, duration: Long) def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
/** /**
* A callback function that will be called when a query execution failed. * A callback function that will be called when a query execution failed.
* Implementations should guarantee thread-safe. * Note that this can be invoked by multiple different threads.
* *
* @param funcName the name of the action that triggered this query. * @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan, * @param qe the QueryExecution object that carries detail information like logical plan,
...@@ -56,34 +58,20 @@ trait QueryExecutionListener { ...@@ -56,34 +58,20 @@ trait QueryExecutionListener {
* @param exception the exception that failed this query. * @param exception the exception that failed this query.
*/ */
@DeveloperApi @DeveloperApi
def onFailure(funcName: String, qe: QueryExecution, exception: Exception) def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
} }
@Experimental
class ExecutionListenerManager extends Logging {
private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
private[this] val lock = new ReentrantReadWriteLock()
/** Acquires a read lock on the cache for the duration of `f`. */
private def readLock[A](f: => A): A = {
val rl = lock.readLock()
rl.lock()
try f finally {
rl.unlock()
}
}
/** Acquires a write lock on the cache for the duration of `f`. */ /**
private def writeLock[A](f: => A): A = { * :: Experimental ::
val wl = lock.writeLock() *
wl.lock() * Manager for [[QueryExecutionListener]]. See [[org.apache.spark.sql.SQLContext.listenerManager]].
try f finally { */
wl.unlock() @Experimental
} class ExecutionListenerManager private[sql] () extends Logging {
}
/** /**
* Registers the specified QueryExecutionListener. * Registers the specified [[QueryExecutionListener]].
*/ */
@DeveloperApi @DeveloperApi
def register(listener: QueryExecutionListener): Unit = writeLock { def register(listener: QueryExecutionListener): Unit = writeLock {
...@@ -91,7 +79,7 @@ class ExecutionListenerManager extends Logging { ...@@ -91,7 +79,7 @@ class ExecutionListenerManager extends Logging {
} }
/** /**
* Unregisters the specified QueryExecutionListener. * Unregisters the specified [[QueryExecutionListener]].
*/ */
@DeveloperApi @DeveloperApi
def unregister(listener: QueryExecutionListener): Unit = writeLock { def unregister(listener: QueryExecutionListener): Unit = writeLock {
...@@ -99,38 +87,59 @@ class ExecutionListenerManager extends Logging { ...@@ -99,38 +87,59 @@ class ExecutionListenerManager extends Logging {
} }
/** /**
* clears out all registered QueryExecutionListeners. * Removes all the registered [[QueryExecutionListener]].
*/ */
@DeveloperApi @DeveloperApi
def clear(): Unit = writeLock { def clear(): Unit = writeLock {
listeners.clear() listeners.clear()
} }
private[sql] def onSuccess( private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
funcName: String, readLock {
qe: QueryExecution, withErrorHandling { listener =>
duration: Long): Unit = readLock { listener.onSuccess(funcName, qe, duration)
withErrorHandling { listener => }
listener.onSuccess(funcName, qe, duration)
} }
} }
private[sql] def onFailure( private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
funcName: String, readLock {
qe: QueryExecution, withErrorHandling { listener =>
exception: Exception): Unit = readLock { listener.onFailure(funcName, qe, exception)
withErrorHandling { listener => }
listener.onFailure(funcName, qe, exception)
} }
} }
private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
/** A lock to prevent updating the list of listeners while we are traversing through them. */
private[this] val lock = new ReentrantReadWriteLock()
private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = {
for (listener <- listeners) { for (listener <- listeners) {
try { try {
f(listener) f(listener)
} catch { } catch {
case e: Exception => logWarning("error executing query execution listener", e) case NonFatal(e) => logWarning("Error executing query execution listener", e)
} }
} }
} }
/** Acquires a read lock on the cache for the duration of `f`. */
private def readLock[A](f: => A): A = {
val rl = lock.readLock()
rl.lock()
try f finally {
rl.unlock()
}
}
/** Acquires a write lock on the cache for the duration of `f`. */
private def writeLock[A](f: => A): A = {
val wl = lock.writeLock()
wl.lock()
try f finally {
wl.unlock()
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment