Skip to content
Snippets Groups Projects
Commit 9dc5fa5f authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18796][SS] StreamingQueryManager should not block when starting a query


## What changes were proposed in this pull request?

Major change in this PR:
- Add `pendingQueryNames` and `pendingQueryIds` to track that are going to start but not yet put into `activeQueries` so that we don't need to hold a lock when starting a query.

Minor changes:
- Fix a potential NPE when the user sets `checkpointLocation` using SQLConf but doesn't specify a query name.
- Add missing docs in `StreamingQueryListener`

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16220 from zsxwing/SPARK-18796.

(cherry picked from commit 417e45c5)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 1aeb7f42
No related branches found
No related tags found
No related merge requests found
......@@ -223,7 +223,8 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.
// `postEvent` does not throw non fatal exception.
postEvent(new QueryStartedEvent(id, runId, name))
// Unblock starting thread
startLatch.countDown()
......@@ -286,7 +287,7 @@ class StreamExecution(
e,
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
logError(s"Query $name terminated with error", e)
logError(s"Query $prettyIdString terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
......
......@@ -83,6 +83,9 @@ object StreamingQueryListener {
/**
* :: Experimental ::
* Event representing the start of a query
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @since 2.1.0
*/
@Experimental
......@@ -94,6 +97,7 @@ object StreamingQueryListener {
/**
* :: Experimental ::
* Event representing any progress updates in a query.
* @param progress The query progress updates.
* @since 2.1.0
*/
@Experimental
......@@ -103,7 +107,8 @@ object StreamingQueryListener {
* :: Experimental ::
* Event representing that termination of a query.
*
* @param id The query id.
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @since 2.1.0
......
......@@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
......@@ -44,10 +44,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
@GuardedBy("activeQueriesLock")
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
@GuardedBy("awaitTerminationLock")
private var lastTerminatedQuery: StreamingQuery = null
/**
......@@ -181,8 +184,65 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
listenerBus.post(event)
}
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamExecution = {
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
new StreamExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
sink,
trigger,
triggerClock,
outputMode)
}
/**
* Start a [[StreamingQuery]].
*
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param df Streaming DataFrame.
......@@ -206,72 +266,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
activeQueriesLock.synchronized {
val name = userSpecifiedName match {
case Some(n) =>
if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
throw new IllegalArgumentException(
s"Cannot start query with name $n as a query with that name is already active")
}
n
case None => null
}
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, name).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
val query = createQuery(
userSpecifiedName,
userSpecifiedCheckpointLocation,
df,
sink,
outputMode,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
trigger,
triggerClock)
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
activeQueriesLock.synchronized {
// Make sure no other query with same name is active
userSpecifiedName.foreach { name =>
if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
}
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
val query = new StreamExecution(
sparkSession,
name,
checkpointLocation,
analyzedPlan,
sink,
trigger,
triggerClock,
outputMode)
// Make sure no other query with same id is active
if (activeQueries.values.exists(_.id == query.id)) {
throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
s"already active. Perhaps you are attempting to restart a query from checkpoint" +
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
s"that is already active.")
}
query.start()
activeQueries.put(query.id, query)
query
}
try {
// When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
// As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
// Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
// time to finish.
query.start()
} catch {
case e: Throwable =>
activeQueriesLock.synchronized {
activeQueries -= query.id
}
throw e
}
query
}
/** Notify (by the StreamingQuery) that the query has been terminated */
......
......@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
import org.apache.spark.sql.types._
......@@ -575,4 +576,59 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
sq.stop()
}
}
test("user specified checkpointLocation precedes SQLConf") {
import testImplicits._
withTempDir { checkpointPath =>
withTempPath { userCheckpointPath =>
assert(!userCheckpointPath.exists(), s"$userCheckpointPath should not exist")
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
val queryName = "test_query"
val ds = MemoryStream[Int].toDS
ds.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", userCheckpointPath.getAbsolutePath)
.start()
.stop()
assert(checkpointPath.listFiles().isEmpty,
"SQLConf path is used even if user specified checkpointLoc: " +
s"${checkpointPath.listFiles()} is not empty")
assert(userCheckpointPath.exists(),
s"The user specified checkpointLoc (userCheckpointPath) is not created")
}
}
}
}
test("use SQLConf checkpoint dir when checkpointLocation is not specified") {
import testImplicits._
withTempDir { checkpointPath =>
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
val queryName = "test_query"
val ds = MemoryStream[Int].toDS
ds.writeStream.format("memory").queryName(queryName).start().stop()
// Should use query name to create a folder in `checkpointPath`
val queryCheckpointDir = new File(checkpointPath, queryName)
assert(queryCheckpointDir.exists(), s"$queryCheckpointDir doesn't exist")
assert(
checkpointPath.listFiles().size === 1,
s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
}
}
}
test("use SQLConf checkpoint dir when checkpointLocation is not specified without query name") {
import testImplicits._
withTempDir { checkpointPath =>
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
val ds = MemoryStream[Int].toDS
ds.writeStream.format("console").start().stop()
// Should create a random folder in `checkpointPath`
assert(
checkpointPath.listFiles().size === 1,
s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment