Skip to content
Snippets Groups Projects
Commit 2f9c7519 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors

StreamingContext.start() can throw exception because DStream.validateAtStart() fails (say, checkpoint directory not set for StateDStream). But by then JobScheduler, JobGenerator, and ReceiverTracker has already started, along with their actors. But those cannot be shutdown because the only way to do that is call StreamingContext.stop() which cannot be called as the context has not been marked as ACTIVE.

The solution in this PR is to stop the internal scheduler if start throw exception, and mark the context as STOPPED.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6559 from tdas/SPARK-7958 and squashes the following commits:

20b2ec1 [Tathagata Das] Added synchronized
790b617 [Tathagata Das] Handled exception in StreamingContext.start()
parent 90c60692
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} ...@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.actor.{Props, SupervisorStrategy} import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
...@@ -576,18 +577,26 @@ class StreamingContext private[streaming] ( ...@@ -576,18 +577,26 @@ class StreamingContext private[streaming] (
def start(): Unit = synchronized { def start(): Unit = synchronized {
state match { state match {
case INITIALIZED => case INITIALIZED =>
validate()
startSite.set(DStream.getCreationSite()) startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get) sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive() StreamingContext.assertNoOtherContextIsActive()
scheduler.start() try {
uiTab.foreach(_.attach()) validate()
state = StreamingContextState.ACTIVE scheduler.start()
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this) StreamingContext.setActiveContext(this)
} }
shutdownHookRef = Utils.addShutdownHook( shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
uiTab.foreach(_.attach())
logInfo("StreamingContext started") logInfo("StreamingContext started")
case ACTIVE => case ACTIVE =>
logWarning("StreamingContext has already been started") logWarning("StreamingContext has already been started")
......
...@@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ...@@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
eventLoop.post(ErrorReported(msg, e)) eventLoop.post(ErrorReported(msg, e))
} }
def isStarted(): Boolean = synchronized {
eventLoop != null
}
private def processEvent(event: JobSchedulerEvent) { private def processEvent(event: JobSchedulerEvent) {
try { try {
event match { event match {
......
...@@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ...@@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(StreamingContext.getActive().isEmpty) assert(StreamingContext.getActive().isEmpty)
} }
test("start failure should stop internal components") {
ssc = new StreamingContext(conf, batchDuration)
val inputStream = addInputStream(ssc)
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.sum + state.getOrElse(0))
}
inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
// Require that the start fails because checkpoint directory was not set
intercept[Exception] {
ssc.start()
}
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(ssc.scheduler.isStarted === false)
}
test("start multiple times") { test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register() addInputStream(ssc).register()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment