Skip to content
Snippets Groups Projects
Commit 1b465569 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start...

[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start multiple StreamingContexts in the same JVM

Currently attempt to start a streamingContext while another one is started throws a confusing exception that the action name JobScheduler is already registered. Instead its best to throw a proper exception as it is not supported.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5907 from tdas/SPARK-7361 and squashes the following commits:

fb81c4a [Tathagata Das] Fix typo
a9cd5bb [Tathagata Das] Added startSite to StreamingContext
5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7361
5870e2b [Tathagata Das] Added check for multiple streaming contexts
parent 4f8a1551
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.spark.streaming package org.apache.spark.streaming
import java.io.InputStream import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
...@@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy} ...@@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.input.FixedLengthBinaryInputFormat
...@@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD ...@@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.CallSite
/** /**
* Main entry point for Spark Streaming functionality. It provides methods used to create * Main entry point for Spark Streaming functionality. It provides methods used to create
...@@ -202,6 +204,8 @@ class StreamingContext private[streaming] ( ...@@ -202,6 +204,8 @@ class StreamingContext private[streaming] (
import StreamingContextState._ import StreamingContextState._
private[streaming] var state = Initialized private[streaming] var state = Initialized
private val startSite = new AtomicReference[CallSite](null)
/** /**
* Return the associated Spark context * Return the associated Spark context
*/ */
...@@ -518,6 +522,7 @@ class StreamingContext private[streaming] ( ...@@ -518,6 +522,7 @@ class StreamingContext private[streaming] (
* @throws SparkException if the context has already been started or stopped. * @throws SparkException if the context has already been started or stopped.
*/ */
def start(): Unit = synchronized { def start(): Unit = synchronized {
import StreamingContext._
if (state == Started) { if (state == Started) {
throw new SparkException("StreamingContext has already been started") throw new SparkException("StreamingContext has already been started")
} }
...@@ -525,10 +530,15 @@ class StreamingContext private[streaming] ( ...@@ -525,10 +530,15 @@ class StreamingContext private[streaming] (
throw new SparkException("StreamingContext has already been stopped") throw new SparkException("StreamingContext has already been stopped")
} }
validate() validate()
sparkContext.setCallSite(DStream.getCreationSite()) startSite.set(DStream.getCreationSite())
scheduler.start() sparkContext.setCallSite(startSite.get)
uiTab.foreach(_.attach()) ACTIVATION_LOCK.synchronized {
state = Started assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
state = Started
setActiveContext(this)
}
} }
/** /**
...@@ -603,6 +613,7 @@ class StreamingContext private[streaming] ( ...@@ -603,6 +613,7 @@ class StreamingContext private[streaming] (
uiTab.foreach(_.detach()) uiTab.foreach(_.detach())
// The state should always be Stopped after calling `stop()`, even if we haven't started yet: // The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped state = Stopped
StreamingContext.setActiveContext(null)
} }
} }
...@@ -612,8 +623,29 @@ class StreamingContext private[streaming] ( ...@@ -612,8 +623,29 @@ class StreamingContext private[streaming] (
*/ */
object StreamingContext extends Logging { object StreamingContext extends Logging {
/**
* Lock that guards access to global variables that track active StreamingContext.
*/
private val ACTIVATION_LOCK = new Object()
private[streaming] val DEFAULT_CLEANER_TTL = 3600 private val activeContext = new AtomicReference[StreamingContext](null)
private def assertNoOtherContextIsActive(): Unit = {
ACTIVATION_LOCK.synchronized {
if (activeContext.get() != null) {
throw new SparkException(
"Only one StreamingContext may be started in this JVM. " +
"Currently running StreamingContext was started at" +
activeContext.get.startSite.get.longForm)
}
}
}
private def setActiveContext(ssc: StreamingContext): Unit = {
ACTIVATION_LOCK.synchronized {
activeContext.set(ssc)
}
}
@deprecated("Replaced by implicit functions in the DStream companion object. This is " + @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0") "kept here only for backward compatibility.", "1.3.0")
......
...@@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
} }
} }
test("multiple streaming contexts") {
sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
ssc = new StreamingContext(sc, Seconds(1))
val input = addInputStream(ssc)
input.foreachRDD { rdd => rdd.count }
ssc.start()
// Creating another streaming context should not create errors
val anotherSsc = new StreamingContext(sc, Seconds(10))
val anotherInput = addInputStream(anotherSsc)
anotherInput.foreachRDD { rdd => rdd.count }
val exception = intercept[SparkException] {
anotherSsc.start()
}
assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
}
test("DStream and generated RDD creation sites") { test("DStream and generated RDD creation sites") {
testPackage.test() testPackage.test()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment