Skip to content
Snippets Groups Projects
Commit ec6f2a97 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6060 from tdas/SPARK-7532 and squashes the following commits:

6fe2e83 [Tathagata Das] Update docs
7dadfc3 [Tathagata Das] Fixed bug again
99c7678 [Tathagata Das] Added logInfo
65aec20 [Tathagata Das] Fix bug
5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532
1a9a818 [Tathagata Das] Fix scaladoc
c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception
parent f3e8e600
No related branches found
No related tags found
No related merge requests found
...@@ -528,28 +528,27 @@ class StreamingContext private[streaming] ( ...@@ -528,28 +528,27 @@ class StreamingContext private[streaming] (
/** /**
* Start the execution of the streams. * Start the execution of the streams.
* *
* @throws SparkException if the context has already been started or stopped. * @throws SparkException if the StreamingContext is already stopped.
*/ */
def start(): Unit = synchronized { def start(): Unit = synchronized {
import StreamingContext._
state match { state match {
case INITIALIZED => case INITIALIZED =>
// good to start validate()
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
state = StreamingContextState.ACTIVE
StreamingContext.setActiveContext(this)
}
logInfo("StreamingContext started")
case ACTIVE => case ACTIVE =>
throw new SparkException("StreamingContext has already been started") logWarning("StreamingContext has already been started")
case STOPPED => case STOPPED =>
throw new SparkException("StreamingContext has already been stopped") throw new SparkException("StreamingContext has already been stopped")
} }
validate()
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
ACTIVATION_LOCK.synchronized {
assertNoOtherContextIsActive()
scheduler.start()
uiTab.foreach(_.attach())
state = StreamingContextState.ACTIVE
setActiveContext(this)
}
} }
/** /**
......
...@@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
addInputStream(ssc).register() addInputStream(ssc).register()
ssc.start() ssc.start()
assert(ssc.getState() === StreamingContextState.ACTIVE) assert(ssc.getState() === StreamingContextState.ACTIVE)
intercept[SparkException] { ssc.start()
ssc.start()
}
assert(ssc.getState() === StreamingContextState.ACTIVE) assert(ssc.getState() === StreamingContextState.ACTIVE)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment