diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d5a5496839407fc104b5044b741452f69e6214ef..7aa9d200046303545af8a50e055a13b7ad39b483 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -81,12 +81,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
 
   private[streaming] def generateRDDs(time: Time): Seq[Job] = {
     this.synchronized {
+      logInfo("Generating RDDs for time " + time)
       outputStreams.flatMap(outputStream => outputStream.generateJob(time))
     }
   }
 
   private[streaming] def forgetOldRDDs(time: Time) {
     this.synchronized {
+      logInfo("Forgetting old RDDs for time " + time)
       outputStreams.foreach(_.forgetOldMetadata(time))
     }
   }
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 5acdd01e5816c99e75ae432d2aa44df3ce30b964..8b18c7bc6a8c223893c34a06edcace0867054ea2 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
       SparkEnv.set(ssc.env)
       try {
         val timeTaken = job.run()
-        logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
-          (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
+        logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
+          (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
       } catch {
         case e: Exception =>
           logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index b77986a3ba42002e6661ad9534023fa99766247a..23a0f0974d3d474de55d61b20561023f871fcf48 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -40,7 +40,11 @@ class Scheduler(ssc: StreamingContext) extends Logging {
         clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
       }
       // Reschedule the batches that were received but not processed before failure
-      ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
+      //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
+      val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
+      println(pendingTimes.mkString(", "))
+      pendingTimes.foreach(time =>
+        graph.generateRDDs(time).foreach(jobManager.runJob))
       // Restart the timer
       timer.restart(graph.zeroTime.milliseconds)
       logInfo("Scheduler's timer restarted")
@@ -64,11 +68,11 @@ class Scheduler(ssc: StreamingContext) extends Logging {
     graph.generateRDDs(time).foreach(jobManager.runJob)
     graph.forgetOldRDDs(time)
     doCheckpoint(time)
-    logInfo("Generated RDDs for time " + time)
   }
 
   private def doCheckpoint(time: Time) {
     if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+      logInfo("Checkpointing graph for time " + time)
       val startTime = System.currentTimeMillis()
       ssc.graph.updateCheckpointData(time)
       checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 5daeb761ddd670b6daa50ad5226ffb07496e5218..8a6c9a5cb5ff622c6e2e4bd1df272149d4889586 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -39,4 +39,8 @@ case class Time(private val millis: Long) {
 
   override def toString: String = (millis.toString + " ms")
 
+}
+
+object Time {
+  val ordering = Ordering.by((time: Time) => time.millis)
 }
\ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index c6ffb252ce887c633a6c7210c63a312ed75ee990..10ccb4318dfd6bf718b91960b318b582e52f7fa7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -128,7 +128,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
   private[streaming]
   class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
 
-     def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+    def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
 
     override def update() {
       hadoopFiles.clear()
@@ -139,11 +139,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
 
     override def restore() {
       hadoopFiles.foreach {
-        case (time, files) => {
-          logInfo("Restoring Hadoop RDD for time " + time + " from files " +
-            files.mkString("[", ",", "]") )
-          files
-          generatedRDDs += ((time, filesToRDD(files)))
+        case (t, f) => {
+          // Restore the metadata in both files and generatedRDDs
+          logInfo("Restoring files for time " + t + " - " +
+            f.mkString("[", ", ", "]") )
+          files += ((t, f))
+          generatedRDDs += ((t, filesToRDD(f)))
         }
       }
     }
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index c4cfffbfc1ef3cf8f8c79a4963525a0ee936f28f..efaa098d2e193991d2d2aeb51ebe74b3328c8825 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,58 +1,58 @@
 package spark.streaming
 
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{FunSuite, BeforeAndAfter}
 import org.apache.commons.io.FileUtils
 import java.io.File
 import scala.runtime.RichInt
 import scala.util.Random
 import spark.streaming.StreamingContext._
-import collection.mutable.ArrayBuffer
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import spark.Logging
+import com.google.common.io.Files
 
 /**
  * This testsuite tests master failures at random times while the stream is running using
  * the real clock.
  */
-class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+
+  var testDir: File = null
+  var checkpointDir: File = null
+  val batchDuration = Milliseconds(500)
 
   before {
-    FileUtils.deleteDirectory(new File(checkpointDir))
+    testDir = Files.createTempDir()
+    checkpointDir = Files.createTempDir()
   }
 
   after {
     FailureSuite.reset()
-    FileUtils.deleteDirectory(new File(checkpointDir))
+    FileUtils.deleteDirectory(checkpointDir)
+    FileUtils.deleteDirectory(testDir)
 
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
   }
 
-  override def framework = "CheckpointSuite"
-
-  override def batchDuration = Milliseconds(500)
-
-  override def checkpointDir = "checkpoint"
-
-  override def checkpointInterval = batchDuration
-
   test("multiple failures with updateStateByKey") {
     val n = 30
     // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
-    val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
-    // Last output: [ (a, 465) ]   for n=30
-    val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+    val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+    // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+    val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
 
     val operation = (st: DStream[String]) => {
      val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
        Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
      }
-     st.map(x => (x, 1))
-     .updateStateByKey[RichInt](updateFunc)
-     .checkpoint(Seconds(2))
-     .map(t => (t._1, t._2.self))
+     st.flatMap(_.split(" "))
+       .map(x => (x, 1))
+       .updateStateByKey[RichInt](updateFunc)
+       .checkpoint(Seconds(2))
+       .map(t => (t._1, t._2.self))
     }
 
-    testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+    testOperationWithMultipleFailures(input, operation, expectedOutput)
   }
 
   test("multiple failures with reduceByKeyAndWindow") {
@@ -60,17 +60,18 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
     val w = 100
     assert(w > n, "Window should be much larger than the number of input sets in this test")
     // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
-    val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
-    // Last output: [ (a, 465) ]
-    val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+    val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+    // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+    val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
 
     val operation = (st: DStream[String]) => {
-      st.map(x => (x, 1))
+      st.flatMap(_.split(" "))
+        .map(x => (x, 1))
         .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
         .checkpoint(Seconds(2))
     }
 
-    testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+    testOperationWithMultipleFailures(input, operation, expectedOutput)
   }
 
 
@@ -79,113 +80,231 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
    * final set of output values is as expected or not. Checking the final value is
    * proof that no intermediate data was lost due to master failures.
    */
-  def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
-    input: Seq[Seq[U]],
-    operation: DStream[U] => DStream[V],
-    lastExpectedOutput: Seq[V],
-    numBatches: Int,
-    numExpectedOutput: Int
+  def testOperationWithMultipleFailures(
+    input: Seq[String],
+    operation: DStream[String] => DStream[(String, Int)],
+    expectedOutput: Seq[(String, Int)]
   ) {
-    var ssc = setupStreams[U, V](input, operation)
-    val mergedOutput = new ArrayBuffer[Seq[V]]()
+    var ssc = setupStreamsWithFileStream(operation)
+
+    val mergedOutput = new ArrayBuffer[(String, Int)]()
+    val lastExpectedOutput = expectedOutput.last
 
+    val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
     var totalTimeRan = 0L
-    while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
-      new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
-      val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
 
+    // Start generating files in the a different thread
+    val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
+    fileGeneratingThread.start()
+
+    // Repeatedly start and kill the streaming context until timed out or
+    // all expected output is generated
+    while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
+
+      // Start the thread to kill the streaming after some time
+      FailureSuite.failed = false
+      val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+      killingThread.start()
+
+      // Run the streams with real clock until last expected output is seen or timed out
+      val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
+      if (killingThread.isAlive) killingThread.interrupt()
+
+      // Merge output and time ran and see whether already timed out or not
       mergedOutput ++= output
       totalTimeRan += timeRan
       logInfo("New output = " + output)
       logInfo("Merged output = " + mergedOutput)
       logInfo("Total time spent = " + totalTimeRan)
-      val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
-      logInfo(
-        "\n-------------------------------------------\n" +
-        "   Restarting stream computation in " + sleepTime + " ms   " +
-        "\n-------------------------------------------\n"
-      )
-      Thread.sleep(sleepTime)
-      FailureSuite.failed = false
-      ssc = new StreamingContext(checkpointDir)
+      if (totalTimeRan > maxTimeToRun) {
+        FailureSuite.timedOut = true
+      }
+
+      if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
+        val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
+        logInfo(
+          "\n-------------------------------------------\n" +
+            "   Restarting stream computation in " + sleepTime + " ms   " +
+            "\n-------------------------------------------\n"
+        )
+        Thread.sleep(sleepTime)
+      }
+
+      // Recreate the streaming context from checkpoint
+      ssc = new StreamingContext(checkpointDir.getPath)
     }
     ssc.stop()
     ssc = null
+    logInfo("Finished test after " + FailureSuite.failureCount + " failures")
+
+    if (FailureSuite.timedOut) {
+      logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
+        expectedOutput.size + " batches of " + batchDuration)
+    }
+
+    // Verify whether the output is as expected
+    verifyOutput(mergedOutput, expectedOutput)
+    if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
+  }
 
-    // Verify whether the last output is the expected one
-    val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
-    assert(lastOutput.toSet === lastExpectedOutput.toSet)
-    logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
+  /** Sets up the stream operations with file input stream */
+  def setupStreamsWithFileStream(
+      operation: DStream[String] => DStream[(String, Int)]
+  ): StreamingContext = {
+    val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
+    ssc.checkpoint(checkpointDir.getPath)
+    val inputStream = ssc.textFileStream(testDir.getPath)
+    val operatedStream = operation(inputStream)
+    val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
+    val outputStream = new TestOutputStream(operatedStream, outputBuffer)
+    ssc.registerOutputStream(outputStream)
+    ssc
   }
 
   /**
-   * Runs the streams set up in `ssc` on real clock until the expected max number of
+   * Runs the streams set up in `ssc` on real clock.
    */
-  def runStreamsWithRealClock[V: ClassManifest](
-    ssc: StreamingContext,
-    numBatches: Int,
-    maxExpectedOutput: Int
-  ): (Seq[Seq[V]], Long) = {
+  def runStreamsWithRealClock(
+      ssc: StreamingContext,
+      lastExpectedOutput: (String, Int),
+      timeout: Long
+  ): (Seq[(String, Int)], Long) = {
 
     System.clearProperty("spark.streaming.clock")
 
-    assert(numBatches > 0, "Number of batches to run stream computation is zero")
-    assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
-    logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
-
     // Get the output buffer
-    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
     val output = outputStream.output
-    val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
     val startTime = System.currentTimeMillis()
 
+    // Functions to detect various conditions
+    def hasFailed = FailureSuite.failed
+    def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
+    def isTimedOut = System.currentTimeMillis() - startTime > timeout
+
+    // Start the streaming computation and let it run while ...
+    // (i) StreamingContext has not been shut down yet
+    // (ii) The last expected output has not been generated yet
+    // (iii) Its not timed out yet
     try {
-      // Start computation
       ssc.start()
-
-      // Wait until expected number of output items have been generated
-      while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
-        logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
+      while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
         Thread.sleep(100)
       }
+      logInfo("Has failed = " + hasFailed)
+      logInfo("Is last output generated = " + isLastOutputGenerated)
+      logInfo("Is timed out = " + isTimedOut)
     } catch {
       case e: Exception => logInfo("Exception while running streams: " + e)
     } finally {
       ssc.stop()
     }
+
+    // Verify whether the output of each batch has only one element
+    assert(output.forall(_.size <= 1), "output of each batch should have only one element")
+
+    // Set appropriate flags is timed out or output has been generated
+    if (isTimedOut) FailureSuite.timedOut = true
+    if (isLastOutputGenerated) FailureSuite.outputGenerated = true
+
     val timeTaken = System.currentTimeMillis() - startTime
     logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
-    (output, timeTaken)
+    (output.flatMap(_.headOption), timeTaken)
   }
 
+  /**
+   * Verifies the output value are the same as expected. Since failures can lead to
+   * a batch being processed twice, a batches output may appear more than once
+   * consecutively. To avoid getting confused with those, we eliminate consecutive
+   * duplicate batch outputs of values from the `output`. As a result, the
+   * expected output should not have consecutive batches with the same values as output.
+   */
+  def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
+    // Verify whether expected outputs do not consecutive batches with same output
+    for (i <- 0 until expectedOutput.size - 1) {
+      assert(expectedOutput(i) != expectedOutput(i+1),
+        "Expected output has consecutive duplicate sequence of values")
+    }
 
+    // Match the output with the expected output
+    logInfo(
+      "\n-------------------------------------------\n" +
+        "                Verifying output " +
+        "\n-------------------------------------------\n"
+    )
+    logInfo("Expected output, size = " + expectedOutput.size)
+    logInfo(expectedOutput.mkString("[", ",", "]"))
+    logInfo("Output, size = " + output.size)
+    logInfo(output.mkString("[", ",", "]"))
+    output.foreach(o =>
+      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+    )
+  }
 }
 
 object FailureSuite {
   var failed = false
+  var outputGenerated = false
+  var timedOut = false
   var failureCount = 0
 
   def reset() {
     failed = false
+    outputGenerated = false
+    timedOut = false
     failureCount = 0
   }
 }
 
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
+/**
+ * Thread to kill streaming context after some time.
+ */
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
   initLogging()
 
   override def run() {
-    var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
-    val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
-    logInfo("Kill wait time = " + killWaitTime)
-    Thread.sleep(killWaitTime.toLong)
-    logInfo(
-      "\n---------------------------------------\n" +
-      "Killing streaming context after " + killWaitTime + " ms" +
-      "\n---------------------------------------\n"
-    )
-    if (ssc != null) ssc.stop()
-    FailureSuite.failed = true
-    FailureSuite.failureCount += 1
+    try {
+      var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
+      val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+      logInfo("Kill wait time = " + killWaitTime)
+      Thread.sleep(killWaitTime)
+      logInfo(
+        "\n---------------------------------------\n" +
+          "Killing streaming context after " + killWaitTime + " ms" +
+          "\n---------------------------------------\n"
+      )
+      if (ssc != null) {
+        ssc.stop()
+        FailureSuite.failed = true
+        FailureSuite.failureCount += 1
+      }
+      logInfo("Killing thread exited")
+    } catch {
+      case ie: InterruptedException => logInfo("Killing thread interrupted")
+      case e: Exception => logWarning("Exception in killing thread", e)
+    }
   }
 }
+
+/**
+ * Thread to generate input files periodically with the desired text
+ */
+class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
+  extends Thread with Logging {
+  initLogging()
+
+  override def run() {
+    try {
+      Thread.sleep(5000) // To make sure that all the streaming context has been set up
+      for (i <- 0 until input.size) {
+        FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
+        Thread.sleep(interval)
+      }
+      logInfo("File generating thread exited")
+    } catch {
+      case ie: InterruptedException => logInfo("File generating thread interrupted")
+      case e: Exception => logWarning("File generating in killing thread", e)
+    }
+  }
+}
+