From ec6f2a9774167014566fb9608ee4394d2ce5fd6a Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Tue, 12 May 2015 08:48:24 -0700 Subject: [PATCH] [SPARK-7532] [STREAMING] StreamingContext.start() made to logWarning and not throw exception Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6060 from tdas/SPARK-7532 and squashes the following commits: 6fe2e83 [Tathagata Das] Update docs 7dadfc3 [Tathagata Das] Fixed bug again 99c7678 [Tathagata Das] Added logInfo 65aec20 [Tathagata Das] Fix bug 5bf031b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7532 1a9a818 [Tathagata Das] Fix scaladoc c584313 [Tathagata Das] StreamingContext.start() made to logWarning and not throw exception --- .../spark/streaming/StreamingContext.scala | 27 +++++++++---------- .../streaming/StreamingContextSuite.scala | 4 +-- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 2c5834defa..8461e90120 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -528,28 +528,27 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. * - * @throws SparkException if the context has already been started or stopped. + * @throws SparkException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - import StreamingContext._ state match { case INITIALIZED => - // good to start + validate() + startSite.set(DStream.getCreationSite()) + sparkContext.setCallSite(startSite.get) + StreamingContext.ACTIVATION_LOCK.synchronized { + StreamingContext.assertNoOtherContextIsActive() + scheduler.start() + uiTab.foreach(_.attach()) + state = StreamingContextState.ACTIVE + StreamingContext.setActiveContext(this) + } + logInfo("StreamingContext started") case ACTIVE => - throw new SparkException("StreamingContext has already been started") + logWarning("StreamingContext has already been started") case STOPPED => throw new SparkException("StreamingContext has already been stopped") } - validate() - startSite.set(DStream.getCreationSite()) - sparkContext.setCallSite(startSite.get) - ACTIVATION_LOCK.synchronized { - assertNoOtherContextIsActive() - scheduler.start() - uiTab.foreach(_.attach()) - state = StreamingContextState.ACTIVE - setActiveContext(this) - } } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index b8247db7e8..47299513de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -136,9 +136,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w addInputStream(ssc).register() ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) - intercept[SparkException] { - ssc.start() - } + ssc.start() assert(ssc.getState() === StreamingContextState.ACTIVE) } -- GitLab