diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index d59146e069d22847dbfa81a40c6b0219ab9d5ad4..fa5f0e81ddafcedc77e01e2f69332dc2250a622f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -354,17 +354,17 @@ abstract class DStream[T: ClassTag] (
    * this method to save custom checkpoint data.
    */
   private[streaming] def updateCheckpointData(currentTime: Time) {
-    logInfo("Updating checkpoint data for time " + currentTime)
+    logDebug("Updating checkpoint data for time " + currentTime)
     checkpointData.update(currentTime)
     dependencies.foreach(_.updateCheckpointData(currentTime))
     logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
   }
 
   private[streaming] def clearCheckpointData(time: Time) {
-    logInfo("Clearing checkpoint data")
+    logDebug("Clearing checkpoint data")
     checkpointData.cleanup(time)
     dependencies.foreach(_.clearCheckpointData(time))
-    logInfo("Cleared checkpoint data")
+    logDebug("Cleared checkpoint data")
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index caed1b3755ce1921123a221ca7089aed51d57a0a..b5f11d344068d828330377bfa37c251e2f208736 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.streaming.scheduler
 
-import akka.actor.{Props, Actor}
-import org.apache.spark.SparkEnv
-import org.apache.spark.Logging
+import akka.actor.{ActorRef, ActorSystem, Props, Actor}
+import org.apache.spark.{SparkException, SparkEnv, Logging}
 import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
 import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 import scala.util.{Failure, Success, Try}
@@ -40,13 +39,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
   private val ssc = jobScheduler.ssc
   private val graph = ssc.graph
-  private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-    def receive = {
-      case event: JobGeneratorEvent =>
-        logDebug("Got event of type " + event.getClass.getName)
-        processEvent(event)
-    }
-  }))
   val clock = {
     val clockClass = ssc.sc.conf.get(
       "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
@@ -60,7 +52,23 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     null
   }
 
+  // eventActor is created when generator starts.
+  // This not being null means the scheduler has been started and not stopped
+  private var eventActor: ActorRef = null
+
+  /** Start generation of jobs */
   def start() = synchronized {
+    if (eventActor != null) {
+      throw new SparkException("JobGenerator already started")
+    }
+
+    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+      def receive = {
+        case event: JobGeneratorEvent =>
+          logDebug("Got event of type " + event.getClass.getName)
+          processEvent(event)
+      }
+    }), "JobGenerator")
     if (ssc.isCheckpointPresent) {
       restart()
     } else {
@@ -68,11 +76,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     }
   }
 
-  def stop() {
-    timer.stop()
-    if (checkpointWriter != null) checkpointWriter.stop()
-    ssc.graph.stop()
-    logInfo("JobGenerator stopped")
+  /** Stop generation of jobs */
+  def stop() = synchronized {
+    if (eventActor != null) {
+      timer.stop()
+      ssc.env.actorSystem.stop(eventActor)
+      if (checkpointWriter != null) checkpointWriter.stop()
+      ssc.graph.stop()
+      logInfo("JobGenerator stopped")
+    }
   }
 
   /**
@@ -172,4 +184,3 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     }
   }
 }
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 24d57548c35cd53988c9d12248b05e16533e5aa9..de675d3c7fb94b4983e1583ea782c4a29b8dce73 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -41,20 +41,26 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
   private val executor = Executors.newFixedThreadPool(numConcurrentJobs)
   private val jobGenerator = new JobGenerator(this)
-  private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-    def receive = {
-      case event: JobSchedulerEvent => processEvent(event)
-    }
-  }))
-  val clock = jobGenerator.clock // used by testsuites
+  val clock = jobGenerator.clock
   val listenerBus = new StreamingListenerBus()
 
+  // These two are created only when scheduler starts.
+  // eventActor not being null means the scheduler has been started and not stopped
   var networkInputTracker: NetworkInputTracker = null
+  private var eventActor: ActorRef = null
+
 
   def start() = synchronized {
-    if (networkInputTracker != null) {
-      throw new SparkException("StreamingContext already started")
+    if (eventActor != null) {
+      throw new SparkException("JobScheduler already started")
     }
+
+    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+      def receive = {
+        case event: JobSchedulerEvent => processEvent(event)
+      }
+    }), "JobScheduler")
+    listenerBus.start()
     networkInputTracker = new NetworkInputTracker(ssc)
     networkInputTracker.start()
     Thread.sleep(1000)
@@ -63,13 +69,15 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   }
 
   def stop() = synchronized {
-    if (networkInputTracker != null) {
+    if (eventActor != null) {
       jobGenerator.stop()
       networkInputTracker.stop()
       executor.shutdown()
       if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
         executor.shutdownNow()
       }
+      listenerBus.stop()
+      ssc.env.actorSystem.stop(eventActor)
       logInfo("JobScheduler stopped")
     }
   }
@@ -104,7 +112,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
       case e: Throwable =>
         reportError("Error in job scheduler", e)
     }
-
   }
 
   private def handleJobStart(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 34fb158205052dfba36d9858be75b993119dda09..0d9733fa69a12a67f386fa736f379aebfe06619c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -19,8 +19,7 @@ package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
 import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkException, Logging, SparkEnv}
 import org.apache.spark.SparkContext._
 
 import scala.collection.mutable.HashMap
@@ -32,6 +31,7 @@ import akka.pattern.ask
 import akka.dispatch._
 import org.apache.spark.storage.BlockId
 import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.util.AkkaUtils
 
 private[streaming] sealed trait NetworkInputTrackerMessage
 private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
@@ -39,7 +39,9 @@ private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], m
 private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
 
 /**
- * This class manages the execution of the receivers of NetworkInputDStreams.
+ * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
+ * this class must be created after all input streams have been added and StreamingContext.start()
+ * has been called because it needs the final set of input streams at the time of instantiation.
  */
 private[streaming]
 class NetworkInputTracker(ssc: StreamingContext) extends Logging {
@@ -49,23 +51,33 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
   val receiverExecutor = new ReceiverExecutor()
   val receiverInfo = new HashMap[Int, ActorRef]
   val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
-  val timeout = 5000.milliseconds
+  val timeout = AkkaUtils.askTimeout(ssc.conf)
 
+
+  // actor is created when generator starts.
+  // This not being null means the tracker has been started and not stopped
+  var actor: ActorRef = null
   var currentTime: Time = null
 
   /** Start the actor and receiver execution thread. */
   def start() {
+    if (actor != null) {
+      throw new SparkException("NetworkInputTracker already started")
+    }
+
     if (!networkInputStreams.isEmpty) {
-      ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
       receiverExecutor.start()
+      logInfo("NetworkInputTracker started")
     }
   }
 
   /** Stop the receiver execution thread. */
   def stop() {
-    if (!networkInputStreams.isEmpty) {
+    if (!networkInputStreams.isEmpty && actor != null) {
       receiverExecutor.interrupt()
       receiverExecutor.stopReceivers()
+      ssc.env.actorSystem.stop(actor)
       logInfo("NetworkInputTracker stopped")
     }
   }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 36225e190cd7917502f23debf6a7b9c77b14743e..461ea3506477f8fcfcb6e6cadb1141bf3bf15473 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -24,9 +24,10 @@ import org.apache.spark.util.Distribution
 sealed trait StreamingListenerEvent
 
 case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
-
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
+/** An event used in the listener to shutdown the listener daemon thread. */
+private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
 
 /**
  * A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 110a20f282f110879ad7836399f2f9e3784a1ac1..6e6e22e1aff48e2aa1d9efd29c214acbf5612143 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -31,7 +31,7 @@ private[spark] class StreamingListenerBus() extends Logging {
   private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
 
-  new Thread("StreamingListenerBus") {
+  val listenerThread = new Thread("StreamingListenerBus") {
     setDaemon(true)
     override def run() {
       while (true) {
@@ -41,11 +41,18 @@ private[spark] class StreamingListenerBus() extends Logging {
             listeners.foreach(_.onBatchStarted(batchStarted))
           case batchCompleted: StreamingListenerBatchCompleted =>
             listeners.foreach(_.onBatchCompleted(batchCompleted))
+          case StreamingListenerShutdown =>
+            // Get out of the while loop and shutdown the daemon thread
+            return
           case _ =>
         }
       }
     }
-  }.start()
+  }
+
+  def start() {
+    listenerThread.start()
+  }
 
   def addListener(listener: StreamingListener) {
     listeners += listener
@@ -54,9 +61,9 @@ private[spark] class StreamingListenerBus() extends Logging {
   def post(event: StreamingListenerEvent) {
     val eventAdded = eventQueue.offer(event)
     if (!eventAdded && !queueFullErrorMessageLogged) {
-      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
-        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
-        "rate at which tasks are being started by the scheduler.")
+      logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
+        "rate at which events are being started by the scheduler.")
       queueFullErrorMessageLogged = true
     }
   }
@@ -68,7 +75,7 @@ private[spark] class StreamingListenerBus() extends Logging {
    */
   def waitUntilEmpty(timeoutMillis: Int): Boolean = {
     val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty()) {
+    while (!eventQueue.isEmpty) {
       if (System.currentTimeMillis > finishTime) {
         return false
       }
@@ -78,4 +85,6 @@ private[spark] class StreamingListenerBus() extends Logging {
     }
     return true
   }
+
+  def stop(): Unit = post(StreamingListenerShutdown)
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index d644240405caa478f6b838473a8d7f7475615942..559c2473851b30bf73f9d376126f846edf826c6d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -20,17 +20,7 @@ package org.apache.spark.streaming.util
 private[streaming]
 class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
   
-  private val minPollTime = 25L
-  
-  private val pollTime = {
-    if (period / 10.0 > minPollTime) {
-      (period / 10.0).toLong
-    } else {
-      minPollTime
-    }  
-  }
-  
-  private val thread = new Thread() {
+  private val thread = new Thread("RecurringTimer") {
     override def run() { loop }    
   }
   
@@ -66,7 +56,6 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
         callback(nextTime)
         nextTime += period
       }
-      
     } catch {
       case e: InterruptedException =>
     }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9eb9b3684cf282dd6c9a339c992af7e4455fc4c7..10c18a7f7e2e59c26b88e8c5796ba163e661e04d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
       ssc.stop()
       ssc = null
     }
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
   }
 
   test("from no conf constructor") {
@@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
 
   test("stop multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)
+    addInputStream(ssc).register
+    ssc.start()
     ssc.stop()
     ssc.stop()
     ssc = null
@@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
 
   test("stop only streaming context") {
     ssc = new StreamingContext(master, appName, batchDuration)
+    sc = ssc.sparkContext
+    addInputStream(ssc).register
+    ssc.start()
     ssc.stop(false)
     ssc = null
     assert(sc.makeRDD(1 to 100).collect().size === 100)
+    ssc = new StreamingContext(sc, batchDuration)
   }
 
   test("waitForStop") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3569624d5173f44e7c558245d46e05e58625510a..a8ff444109e8b5891e1a1be7eefc202e019ee6de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
       val startTime = System.currentTimeMillis()
       while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
         logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
-        Thread.sleep(10)
+        ssc.waitForStop(50)
       }
       val timeTaken = System.currentTimeMillis() - startTime
-
+      logInfo("Output generated in " + timeTaken + " milliseconds")
+      output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
       assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
       assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")