diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3841b5616dca24471a5d3e85baab617308a24f0a..ee63b3c4a15a26aad6c061cd65c29779ae7ab00c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -63,7 +63,7 @@ trait SparkListener {
    * Called when a task begins remotely fetching its result (will not be called for tasks that do
    * not need to fetch the result remotely).
    */
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
 
   /**
    * Called when a task ends
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
 
   def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
     val stats = d.statCounter
-    logInfo(heading + stats)
     val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+    logInfo(heading + stats)
     logInfo(percentilesHeader)
     logInfo("\t" + quantiles.mkString("\t"))
   }
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
-
-
   val seconds = 1000L
   val minutes = seconds * 60
   val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
 }
 
 
-
 case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
 object RuntimePercentage {
   def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d5824e79547974e643b348b12465fa6fe78a2fe0..85687ea330660533c2fb95c1f5016c3db0ffb152 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
     return true
   }
 }
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914eb536a6565210913758d12db91e3a1306..7b343d2376addc7171546ebf9c6d9e428bcd9d71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -40,7 +40,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
-  val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+  val pendingTimes = ssc.scheduler.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds
 
   def validate() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 329d2b58357ae36aa9f20f36e9e3e49776679655..a78d3965ee94e0446de016af7e9a309d9d210491 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -17,24 +17,19 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream._
 import StreamingContext._
-import org.apache.spark.util.MetadataCleaner
-
-//import Time._
-
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
 
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.conf.Configuration
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b9a58fded67614d40d7a4363badb6ac30dc844fd..daed7ff7c3f1385489c8f691591f100087a39249 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,6 +21,7 @@ import dstream.InputDStream
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 import collection.mutable.ArrayBuffer
 import org.apache.spark.Logging
+import org.apache.spark.streaming.scheduler.Job
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
   initLogging()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
deleted file mode 100644
index 5233129506f9e8b2cdd4b0a208d8d01972564d0d..0000000000000000000000000000000000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
-import java.util.concurrent.Executors
-import collection.mutable.HashMap
-import collection.mutable.ArrayBuffer
-
-
-private[streaming]
-class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
-  
-  class JobHandler(ssc: StreamingContext, job: Job) extends Runnable {
-    def run() {
-      SparkEnv.set(ssc.env)
-      try {
-        val timeTaken = job.run()
-        logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
-          (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
-      } catch {
-        case e: Exception =>
-          logError("Running " + job + " failed", e)
-      }
-      clearJob(job)
-    }
-  }
-
-  initLogging()
-
-  val jobExecutor = Executors.newFixedThreadPool(numThreads) 
-  val jobs = new HashMap[Time, ArrayBuffer[Job]]
-
-  def runJob(job: Job) {
-    jobs.synchronized {
-      jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
-    }
-    jobExecutor.execute(new JobHandler(ssc, job))
-    logInfo("Added " + job + " to queue")
-  }
-
-  def stop() {
-    jobExecutor.shutdown()
-  }
-
-  private def clearJob(job: Job) {
-    var timeCleared = false
-    val time = job.time
-    jobs.synchronized {
-      val jobsOfTime = jobs.get(time)
-      if (jobsOfTime.isDefined) {
-        jobsOfTime.get -= job
-        if (jobsOfTime.get.isEmpty) {
-          jobs -= time
-          timeCleared = true
-        }
-      } else {
-        throw new Exception("Job finished for time " + job.time +
-          " but time does not exist in jobs")
-      }
-    }
-    if (timeCleared) {
-      ssc.scheduler.clearOldMetadata(time)
-    }
-  }
-
-  def getPendingTimes(): Array[Time] = {
-    jobs.synchronized {
-      jobs.keySet.toArray
-    }
-  }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index d2c4fdee657f756dbc8f57b9b8d999b150e9f05e..41da028a3cf9f0dbfd1dbf68cad6ec39591ed294 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -47,9 +47,9 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
 import twitter4j.Status
 import twitter4j.auth.Authorization
+import org.apache.spark.streaming.scheduler._
 import akka.util.ByteString
 
-
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
  * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
@@ -148,9 +148,10 @@ class StreamingContext private (
     }
   }
 
-  protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null
-  protected[streaming] var receiverJobThread: Thread = null
-  protected[streaming] var scheduler: Scheduler = null
+  protected[streaming] val checkpointDuration: Duration = {
+    if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+  }
+  protected[streaming] val scheduler = new JobScheduler(this)
 
   /**
    * Return the associated Spark context
@@ -512,6 +513,13 @@ class StreamingContext private (
     graph.addOutputStream(outputStream)
   }
 
+  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+    * receiving system events related to streaming.
+    */
+  def addStreamingListener(streamingListener: StreamingListener) {
+    scheduler.listenerBus.addListener(streamingListener)
+  }
+
   protected def validate() {
     assert(graph != null, "Graph is null")
     graph.validate()
@@ -527,27 +535,22 @@ class StreamingContext private (
    * Start the execution of the streams.
    */
   def start() {
-    if (checkpointDir != null && checkpointDuration == null && graph != null) {
-      checkpointDuration = graph.batchDuration
-    }
-
     validate()
 
+    // Get the network input streams
     val networkInputStreams = graph.getInputStreams().filter(s => s match {
         case n: NetworkInputDStream[_] => true
         case _ => false
       }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
 
+    // Start the network input tracker (must start before receivers)
     if (networkInputStreams.length > 0) {
-      // Start the network input tracker (must start before receivers)
       networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
       networkInputTracker.start()
     }
-
     Thread.sleep(1000)
 
     // Start the scheduler
-    scheduler = new Scheduler(this)
     scheduler.start()
   }
 
@@ -558,7 +561,6 @@ class StreamingContext private (
     try {
       if (scheduler != null) scheduler.stop()
       if (networkInputTracker != null) networkInputTracker.stop()
-      if (receiverJobThread != null) receiverJobThread.interrupt()
       sc.stop()
       logInfo("StreamingContext stopped successfully")
     } catch {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 80dcf87491ea3450f062084483a6fed66126d428..78d318cf27b1bdc7f4c2dfe342705e6e7e2a1278 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
 import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     ssc.remember(duration)
   }
 
+  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+    * receiving system events related to streaming.
+    */
+  def addStreamingListener(streamingListener: StreamingListener) {
+    ssc.addStreamingListener(streamingListener)
+  }
+
   /**
    * Starts the execution of the streams.
    */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 98b14cb224263778e3edfeb3aa1ad377f69f7bfc..364abcde68c95125d887a6ed0b40ad52611b63eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Job, Time}
+import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 
 private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index d5ae8aef9278a79630474c2710718e11af26bdc7..5add20871e3fd68b83d63d649af903f2d13f4f51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.streaming._
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.rdd.{RDD, BlockRDD}
 import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
 
 /**
  * Abstract class for defining any InputDStream that has to start a receiver on worker
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4e8d07fe921fbcf2e06ceca5f21e04aced35a6c1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.Time
+
+/**
+ * Class having information on completed batches.
+ * @param batchTime   Time of the batch
+ * @param submissionTime  Clock time of when jobs of this batch was submitted to
+ *                        the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
+ */
+case class BatchInfo(
+    batchTime: Time,
+    submissionTime: Long,
+    processingStartTime: Option[Long],
+    processingEndTime: Option[Long]
+  ) {
+
+  /**
+   * Time taken for the first job of this batch to start processing from the time this batch
+   * was submitted to the streaming scheduler. Essentially, it is
+   * `processingStartTime` - `submissionTime`.
+   */
+  def schedulingDelay = processingStartTime.map(_ - submissionTime)
+
+  /**
+   * Time taken for the all jobs of this batch to finish processing from the time they started
+   * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+   */
+  def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+
+  /**
+   * Time taken for all the jobs of this batch to finish processing from the time they
+   * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
+   */
+  def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
similarity index 77%
rename from streaming/src/main/scala/org/apache/spark/streaming/Job.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 2128b7c7a64c27c98a8d88db6d27f801b8cf606e..7341bfbc99399b94a1143e12752c1120bf3fbdb3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -15,13 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.streaming.Time
 
+/**
+ * Class representing a Spark computation. It may contain multiple Spark jobs.
+ */
 private[streaming]
 class Job(val time: Time, func: () => _) {
-  val id = Job.getNewId()
+  var id: String = _
+
   def run(): Long = {
     val startTime = System.currentTimeMillis 
     func() 
@@ -29,13 +33,9 @@ class Job(val time: Time, func: () => _) {
     (stopTime - startTime)
   }
 
-  override def toString = "streaming job " + id + " @ " + time 
-}
-
-private[streaming]
-object Job {
-  val id = new AtomicLong(0)
-
-  def getNewId() = id.getAndIncrement()
-}
+  def setId(number: Int) {
+    id = "streaming job " + time + "." + number
+  }
 
+  override def toString = id
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
similarity index 76%
rename from streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index ed892e33e6c3467f9d9aeef923dc2199a711b89b..1cd0b9b0a4ab76c31ee50e156fcc775f255264f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -15,31 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
 
-import util.{ManualClock, RecurringTimer, Clock}
 import org.apache.spark.SparkEnv
 import org.apache.spark.Logging
+import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
+import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 
+/**
+ * This class generates jobs from DStreams as well as drives checkpointing and cleaning
+ * up DStream metadata.
+ */
 private[streaming]
-class Scheduler(ssc: StreamingContext) extends Logging {
+class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
   initLogging()
-
-  val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
-  val jobManager = new JobManager(ssc, concurrentJobs)
-  val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
-    new CheckpointWriter(ssc.checkpointDir)
-  } else {
-    null
-  }
-
+  val ssc = jobScheduler.ssc
   val clockClass = System.getProperty(
     "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
   val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
   val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
     longTime => generateJobs(new Time(longTime)))
   val graph = ssc.graph
+  lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+    new CheckpointWriter(ssc.checkpointDir)
+  } else {
+    null
+  }
+
   var latestTime: Time = null
 
   def start() = synchronized {
@@ -48,26 +51,24 @@ class Scheduler(ssc: StreamingContext) extends Logging {
     } else {
       startFirstTime()
     }
-    logInfo("Scheduler started")
+    logInfo("JobGenerator started")
   }
   
   def stop() = synchronized {
     timer.stop()
-    jobManager.stop()
     if (checkpointWriter != null) checkpointWriter.stop()
     ssc.graph.stop()
-    logInfo("Scheduler stopped")    
+    logInfo("JobGenerator stopped")
   }
 
   private def startFirstTime() {
     val startTime = new Time(timer.getStartTime())
     graph.start(startTime - graph.batchDuration)
     timer.start(startTime.milliseconds)
-    logInfo("Scheduler's timer started at " + startTime)
+    logInfo("JobGenerator's timer started at " + startTime)
   }
 
   private def restart() {
-
     // If manual clock is being used for testing, then
     // either set the manual clock to the last checkpointed time,
     // or if the property is defined set it to that time
@@ -93,35 +94,34 @@ class Scheduler(ssc: StreamingContext) extends Logging {
     val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
     logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
     timesToReschedule.foreach(time =>
-      graph.generateJobs(time).foreach(jobManager.runJob)
+      jobScheduler.runJobs(time, graph.generateJobs(time))
     )
 
     // Restart the timer
     timer.start(restartTime.milliseconds)
-    logInfo("Scheduler's timer restarted at " + restartTime)
+    logInfo("JobGenerator's timer restarted at " + restartTime)
   }
 
   /** Generate jobs and perform checkpoint for the given `time`.  */
-  def generateJobs(time: Time) {
+  private def generateJobs(time: Time) {
     SparkEnv.set(ssc.env)
     logInfo("\n-----------------------------------------------------\n")
-    graph.generateJobs(time).foreach(jobManager.runJob)
+    jobScheduler.runJobs(time, graph.generateJobs(time))
     latestTime = time
     doCheckpoint(time)
   }
 
   /**
-   * Clear old metadata assuming jobs of `time` have finished processing.
-   * And also perform checkpoint.
+   * On batch completion, clear old metadata and checkpoint computation.
    */
-  def clearOldMetadata(time: Time) {
+  private[streaming] def onBatchCompletion(time: Time) {
     ssc.graph.clearOldMetadata(time)
     doCheckpoint(time)
   }
 
   /** Perform checkpoint for the give `time`. */
-  def doCheckpoint(time: Time) = synchronized {
-    if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+  private def doCheckpoint(time: Time) = synchronized {
+    if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
       logInfo("Checkpointing graph for time " + time)
       ssc.graph.updateCheckpointData(time)
       checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9511ccfbeddd6132b455882bddf65c8bd82bb5e6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming._
+
+/**
+ * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
+ * the jobs and runs them using a thread pool. Number of threads 
+ */
+private[streaming]
+class JobScheduler(val ssc: StreamingContext) extends Logging {
+
+  initLogging()
+
+  val jobSets = new ConcurrentHashMap[Time, JobSet]
+  val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+  val executor = Executors.newFixedThreadPool(numConcurrentJobs)
+  val generator = new JobGenerator(this)
+  val listenerBus = new StreamingListenerBus()
+
+  def clock = generator.clock
+
+  def start() {
+    generator.start()
+  }
+
+  def stop() {
+    generator.stop()
+    executor.shutdown()
+    if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+      executor.shutdownNow()
+    }
+  }
+
+  def runJobs(time: Time, jobs: Seq[Job]) {
+    if (jobs.isEmpty) {
+      logInfo("No jobs added for time " + time)
+    } else {
+      val jobSet = new JobSet(time, jobs)
+      jobSets.put(time, jobSet)
+      jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
+      logInfo("Added jobs for time " + time)
+    }
+  }
+
+  def getPendingTimes(): Array[Time] = {
+    jobSets.keySet.toArray(new Array[Time](0))
+  }
+
+  private def beforeJobStart(job: Job) {
+    val jobSet = jobSets.get(job.time)
+    if (!jobSet.hasStarted) {
+      listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo()))
+    }
+    jobSet.beforeJobStart(job)
+    logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
+    SparkEnv.set(generator.ssc.env)
+  }
+
+  private def afterJobEnd(job: Job) {
+    val jobSet = jobSets.get(job.time)
+    jobSet.afterJobStop(job)
+    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+    if (jobSet.hasCompleted) {
+      jobSets.remove(jobSet.time)
+      generator.onBatchCompletion(jobSet.time)
+      logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+        jobSet.totalDelay / 1000.0, jobSet.time.toString,
+        jobSet.processingDelay / 1000.0
+      ))
+      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
+    }
+  }
+
+  private[streaming]
+  class JobHandler(job: Job) extends Runnable {
+    def run() {
+      beforeJobStart(job)
+      try {
+        job.run()
+      } catch {
+        case e: Exception =>
+          logError("Running " + job + " failed", e)
+      }
+      afterJobEnd(job)
+    }
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
new file mode 100644
index 0000000000000000000000000000000000000000..57268674ead9dd22a9c77a941f0195544400999c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable.HashSet
+import org.apache.spark.streaming.Time
+
+/** Class representing a set of Jobs
+  * belong to the same batch.
+  */
+private[streaming]
+case class JobSet(time: Time, jobs: Seq[Job]) {
+
+  private val incompleteJobs = new HashSet[Job]()
+  var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+  var processingStartTime = -1L // when the first job of this jobset started processing
+  var processingEndTime = -1L // when the last job of this jobset finished processing
+
+  jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
+  incompleteJobs ++= jobs
+
+  def beforeJobStart(job: Job) {
+    if (processingStartTime < 0) processingStartTime = System.currentTimeMillis()
+  }
+
+  def afterJobStop(job: Job) {
+    incompleteJobs -= job
+    if (hasCompleted) processingEndTime = System.currentTimeMillis()
+  }
+
+  def hasStarted() = (processingStartTime > 0)
+
+  def hasCompleted() = incompleteJobs.isEmpty
+
+  // Time taken to process all the jobs from the time they started processing
+  // (i.e. not including the time they wait in the streaming scheduler queue)
+  def processingDelay = processingEndTime - processingStartTime
+
+  // Time taken to process all the jobs from the time they were submitted
+  // (i.e. including the time they wait in the streaming scheduler queue)
+  def totalDelay = {
+    processingEndTime - time.milliseconds
+  }
+
+  def toBatchInfo(): BatchInfo = {
+    new BatchInfo(
+      time,
+      submissionTime,
+      if (processingStartTime >= 0 ) Some(processingStartTime) else None,
+      if (processingEndTime >= 0 ) Some(processingEndTime) else None
+    )
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
similarity index 98%
rename from streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 6e9a781978d2054f5836b53e321a3ec27521f314..abff55d77c829b5063e7c52ff51dfec4fac53c1a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
 import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
@@ -31,6 +31,7 @@ import akka.actor._
 import akka.pattern.ask
 import akka.dispatch._
 import org.apache.spark.storage.BlockId
+import org.apache.spark.streaming.{Time, StreamingContext}
 
 private[streaming] sealed trait NetworkInputTrackerMessage
 private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
new file mode 100644
index 0000000000000000000000000000000000000000..36225e190cd7917502f23debf6a7b9c77b14743e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import scala.collection.mutable.Queue
+import org.apache.spark.util.Distribution
+
+/** Base trait for events related to StreamingListener */
+sealed trait StreamingListenerEvent
+
+case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
+
+/**
+ * A listener interface for receiving information about an ongoing streaming
+ * computation.
+ */
+trait StreamingListener {
+  /**
+   * Called when processing of a batch has completed
+   */
+  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
+
+  /**
+   * Called when processing of a batch has started
+   */
+  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
+
+
+/**
+ * A simple StreamingListener that logs summary statistics across Spark Streaming batches
+ * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
+ */
+class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
+  // Queue containing latest completed batches
+  val batchInfos = new Queue[BatchInfo]()
+
+  override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
+    batchInfos.enqueue(batchStarted.batchInfo)
+    if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+    printStats()
+  }
+
+  def printStats() {
+    showMillisDistribution("Total delay: ", _.totalDelay)
+    showMillisDistribution("Processing time: ", _.processingDelay)
+  }
+
+  def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
+    org.apache.spark.scheduler.StatsReportListener.showMillisDistribution(
+      heading, extractDistribution(getMetric))
+  }
+
+  def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+    Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
new file mode 100644
index 0000000000000000000000000000000000000000..110a20f282f110879ad7836399f2f9e3784a1ac1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.Logging
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.concurrent.LinkedBlockingQueue
+
+/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
+private[spark] class StreamingListenerBus() extends Logging {
+  private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+
+  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
+  private var queueFullErrorMessageLogged = false
+
+  new Thread("StreamingListenerBus") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        event match {
+          case batchStarted: StreamingListenerBatchStarted =>
+            listeners.foreach(_.onBatchStarted(batchStarted))
+          case batchCompleted: StreamingListenerBatchCompleted =>
+            listeners.foreach(_.onBatchCompleted(batchCompleted))
+          case _ =>
+        }
+      }
+    }
+  }.start()
+
+  def addListener(listener: StreamingListener) {
+    listeners += listener
+  }
+
+  def post(event: StreamingListenerEvent) {
+    val eventAdded = eventQueue.offer(event)
+    if (!eventAdded && !queueFullErrorMessageLogged) {
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+        "rate at which tasks are being started by the scheduler.")
+      queueFullErrorMessageLogged = true
+    }
+  }
+
+  /**
+   * Waits until there are no more events in the queue, or until the specified time has elapsed.
+   * Used for testing only. Returns true if the queue has emptied and false is the specified time
+   * elapsed before the queue emptied.
+   */
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!eventQueue.isEmpty()) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+       * add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    return true
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 259ef1608cbc5b179372995ef4a426df0101247d..b35ca00b53d71cbbfbcb513cdf31e77d87205ff2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -26,18 +26,6 @@ import util.ManualClock
 
 class BasicOperationsSuite extends TestSuiteBase {
 
-  override def framework() = "BasicOperationsSuite"
-
-  before {
-    System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-  }
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
-
   test("map") {
     val input = Seq(1 to 4, 5 to 8, 9 to 12)
     testOperation(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index e81287b44e51f2d02b38dec9dfb2088bc48c773f..67a0841535b0d64bd4f0ec53d0d109667ab08435 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -40,31 +40,25 @@ import org.apache.spark.streaming.util.ManualClock
  * the checkpointing of a DStream's RDDs as well as the checkpointing of
  * the whole DStream graph.
  */
-class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+class CheckpointSuite extends TestSuiteBase {
 
-  System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  var ssc: StreamingContext = null
+
+  override def batchDuration = Milliseconds(500)
+
+  override def actuallyWait = true // to allow checkpoints to be written
 
-  before {
+  override def beforeFunction() {
+    super.beforeFunction()
     FileUtils.deleteDirectory(new File(checkpointDir))
   }
 
-  after {
+  override def afterFunction() {
+    super.afterFunction()
     if (ssc != null) ssc.stop()
     FileUtils.deleteDirectory(new File(checkpointDir))
-
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
-  var ssc: StreamingContext = null
-
-  override def framework = "CheckpointSuite"
-
-  override def batchDuration = Milliseconds(500)
-
-  override def actuallyWait = true
-
   test("basic rdd checkpoints + dstream graph checkpoint recovery") {
 
     assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6337c5359c3dcac1d8206d8881b6b28864484522..da9b04de1ac44ee4299bb58003412718de0eb545 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -32,17 +32,22 @@ import collection.mutable.ArrayBuffer
  * This testsuite tests master failures at random times while the stream is running using
  * the real clock.
  */
-class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+class FailureSuite extends TestSuiteBase with Logging {
 
   var directory = "FailureSuite"
   val numBatches = 30
-  val batchDuration = Milliseconds(1000)
 
-  before {
+  override def batchDuration = Milliseconds(1000)
+
+  override def useManualClock = false
+
+  override def beforeFunction() {
+    super.beforeFunction()
     FileUtils.deleteDirectory(new File(directory))
   }
 
-  after {
+  override def afterFunction() {
+    super.afterFunction()
     FileUtils.deleteDirectory(new File(directory))
   }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7dc82decefd6e0ac3cb69acf51d95840b2aeef55..62a9f120b46153e713567d50ef777ce692bf0544 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -50,18 +50,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
   val testPort = 9999
 
-  override def checkpointDir = "checkpoint"
-
-  before {
-    System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-  }
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
-
   test("socket input stream") {
     // Start the server
     val testServer = new TestServer()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..fa6414209605405e2a70834409bb3851e10b6422
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.streaming.scheduler._
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.matchers.ShouldMatchers
+
+class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
+
+  val input = (1 to 4).map(Seq(_)).toSeq
+  val operation = (d: DStream[Int]) => d.map(x => x)
+
+  // To make sure that the processing start and end times in collected
+  // information are different for successive batches
+  override def batchDuration = Milliseconds(100)
+  override def actuallyWait = true
+
+  test("basic BatchInfo generation") {
+    val ssc = setupStreams(input, operation)
+    val collector = new BatchInfoCollector
+    ssc.addStreamingListener(collector)
+    runStreams(ssc, input.size, input.size)
+    val batchInfos = collector.batchInfos
+    batchInfos should have size 4
+
+    batchInfos.foreach(info => {
+      info.schedulingDelay should not be None
+      info.processingDelay should not be None
+      info.totalDelay should not be None
+      info.schedulingDelay.get should be >= 0L
+      info.processingDelay.get should be >= 0L
+      info.totalDelay.get should be >= 0L
+    })
+
+    isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
+    isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
+    isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+  }
+
+  /** Check if a sequence of numbers is in increasing order */
+  def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
+    for(i <- 1 until seq.size) {
+      if (seq(i - 1) > seq(i)) return false
+    }
+    true
+  }
+
+  /** Listener that collects information on processed batches */
+  class BatchInfoCollector extends StreamingListener {
+    val batchInfos = new ArrayBuffer[BatchInfo]
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+      batchInfos += batchCompleted.batchInfo
+    }
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 2f34e812a1009df2ccc68f3be07c8426b090b080..e969e91d13e9ad762a92e777b751e8d7abd12560 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -110,7 +110,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
 trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
   // Name of the framework for Spark context
-  def framework = "TestSuiteBase"
+  def framework = this.getClass.getSimpleName
 
   // Master for Spark context
   def master = "local[2]"
@@ -127,9 +127,39 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
   // Maximum time to wait before the test times out
   def maxWaitTimeMillis = 10000
 
+  // Whether to use manual clock or not
+  def useManualClock = true
+
   // Whether to actually wait in real time before changing manual clock
   def actuallyWait = false
 
+  // Default before function for any streaming test suite. Override this
+  // if you want to add your stuff to "before" (i.e., don't call before { } )
+  def beforeFunction() {
+    if (useManualClock) {
+      System.setProperty(
+        "spark.streaming.clock",
+        "org.apache.spark.streaming.util.ManualClock"
+      )
+    } else {
+      System.clearProperty("spark.streaming.clock")
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+  }
+
+  // Default after function for any streaming test suite. Override this
+  // if you want to add your stuff to "after" (i.e., don't call after { } )
+  def afterFunction() {
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+  }
+
+  before(beforeFunction)
+  after(afterFunction)
+
   /**
    * Set up required DStreams to test the DStream operation using the two sequences
    * of input collections.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index f50e05c0d883b97efbd0a92e7a3b9aedb9526fbb..6b4aaefcdf90f02ffaa26e5ed2faa6c5e1f565b0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -22,19 +22,9 @@ import collection.mutable.ArrayBuffer
 
 class WindowOperationsSuite extends TestSuiteBase {
 
-  System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+  override def maxWaitTimeMillis = 20000  // large window tests can sometimes take longer
 
-  override def framework = "WindowOperationsSuite"
-
-  override def maxWaitTimeMillis = 20000
-
-  override def batchDuration = Seconds(1)
-
-  after {
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-    System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
-  }
+  override def batchDuration = Seconds(1)  // making sure its visible in this class
 
   val largerSlideInput = Seq(
     Seq(("a", 1)),