diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index d268b68f904e7c59a00c17090d21f4b6c90a4690..62b225382ea4a89625d513796e12a107080d8ff9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val pendingTimes = ssc.scheduler.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConf = ssc.conf
+  
+  // do not save these configurations
+  sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")
@@ -53,6 +56,47 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   }
 }
 
+private[streaming]  
+object Checkpoint extends Logging {
+  val PREFIX = "checkpoint-"
+  val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
+
+  /** Get the checkpoint file for the given checkpoint time */
+  def checkpointFile(checkpointDir: String, checkpointTime: Time) = {
+    new Path(checkpointDir, PREFIX + checkpointTime.milliseconds)
+  }
+
+  /** Get the checkpoint backup file for the given checkpoint time */
+  def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = {
+    new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
+  }
+
+  /** Get checkpoint files present in the give directory, ordered by oldest-first */
+  def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
+    def sortFunc(path1: Path, path2: Path): Boolean = {
+      val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
+      val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
+      (time1 < time2) || (time1 == time2 && bk1) 
+    }
+
+    val path = new Path(checkpointDir)
+    if (fs.exists(path)) {
+      val statuses = fs.listStatus(path)
+      if (statuses != null) {
+        val paths = statuses.map(_.getPath)
+        val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
+        filtered.sortWith(sortFunc)
+      } else {
+        logWarning("Listing " + path + " returned null")
+        Seq.empty
+      }
+    } else {
+      logInfo("Checkpoint directory " + path + " does not exist")
+      Seq.empty 
+    }
+  }
+}
+
 
 /**
  * Convenience class to handle the writing of graph checkpoint to file
@@ -60,58 +104,67 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
 private[streaming]
 class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
   extends Logging {
-  val file = new Path(checkpointDir, "graph")
   val MAX_ATTEMPTS = 3
   val executor = Executors.newFixedThreadPool(1)
   val compressionCodec = CompressionCodec.createCodec(conf)
-  // The file to which we actually write - and then "move" to file
-  val writeFile = new Path(file.getParent, file.getName + ".next")
-  // The file to which existing checkpoint is backed up (i.e. "moved")
-  val bakFile = new Path(file.getParent, file.getName + ".bk")
-
   private var stopped = false
   private var fs_ : FileSystem = _
 
-  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
-  // I did not notice any errors - reintroduce it ?
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
+      val tempFile = new Path(checkpointDir, "temp")
+      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
+      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
+
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
-          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
-          // This is inherently thread unsafe, so alleviating it by writing to '.next' and
-          // then moving it to the final file
-          val fos = fs.create(writeFile)
+          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
+
+          // Write checkpoint to temp file
+          fs.delete(tempFile, true)   // just in case it exists
+          val fos = fs.create(tempFile)
           fos.write(bytes)
           fos.close()
 
-          // Back up existing checkpoint if it exists
-          if (fs.exists(file) && fs.rename(file, bakFile)) {
-            logDebug("Moved existing checkpoint file to " + bakFile)
+          // If the checkpoint file exists, back it up
+          // If the backup exists as well, just delete it, otherwise rename will fail
+          if (fs.exists(checkpointFile)) {
+            fs.delete(backupFile, true) // just in case it exists
+            if (!fs.rename(checkpointFile, backupFile)) {
+              logWarning("Could not rename " + checkpointFile + " to " + backupFile)
+            }
           }
-          fs.delete(file, false) // paranoia
-
-          // Rename temp written file to the right location
-          if (fs.rename(writeFile, file)) {
-            val finishTime = System.currentTimeMillis()
-            logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
-              "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
-            jobGenerator.onCheckpointCompletion(checkpointTime)
-          } else {
-            throw new SparkException("Failed to rename checkpoint file from "
-              + writeFile + " to " + file)
+
+          // Rename temp file to the final checkpoint file
+          if (!fs.rename(tempFile, checkpointFile)) {
+            logWarning("Could not rename " + tempFile + " to " + checkpointFile)
           }
+
+          // Delete old checkpoint files
+          val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
+          if (allCheckpointFiles.size > 4) {
+            allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
+              logInfo("Deleting " + file)
+              fs.delete(file, true)
+            })
+          }
+
+          // All done, print success 
+          val finishTime = System.currentTimeMillis()
+          logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
+            "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
+          jobGenerator.onCheckpointCompletion(checkpointTime)
           return
         } catch {
           case ioe: IOException =>
-            logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+            logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
             reset()
         }
       }
-      logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
+      logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
     }
   }
 
@@ -124,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
     bos.close()
     try {
       executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
-      logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+      logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
     } catch {
       case rej: RejectedExecutionException =>
         logError("Could not submit checkpoint task to the thread pool executor", rej)
@@ -147,7 +200,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
   }
 
   private def fs = synchronized {
-    if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
+    if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
     fs_
   }
 
@@ -160,36 +213,21 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
 private[streaming]
 object CheckpointReader extends Logging {
 
-  private val graphFileNames = Seq("graph", "graph.bk")
-  
   def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
     val checkpointPath = new Path(checkpointDir)
     def fs = checkpointPath.getFileSystem(hadoopConf)
     
-        // See if the checkpoint directory exists
-    if (!fs.exists(checkpointPath)) {
-      logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist")
-      return None
-    }
-
-    // Try to find the checkpoint data
-    val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
-    if (existingFiles.isEmpty) {
-      logInfo("Could not load checkpoint as checkpoint data was not " +
-        "found in directory " + checkpointDir + "")
-      val statuses = fs.listStatus(checkpointPath)
-      if (statuses!=null) {
-        logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" +
-          statuses.mkString("\n"))
-      }
+    // Try to find the checkpoint files 
+    val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
+    if (checkpointFiles.isEmpty) {
       return None
     }
-    logInfo("Checkpoint files found: " + existingFiles.mkString(","))
 
-    // Try to read the checkpoint data
+    // Try to read the checkpoint files in the order  
+    logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
     val compressionCodec = CompressionCodec.createCodec(conf)
-    existingFiles.foreach(file => {
-      logInfo("Attempting to load checkpoint from file '" + file + "'")
+    checkpointFiles.foreach(file => {
+      logInfo("Attempting to load checkpoint from file " + file)
       try {
         val fis = fs.open(file)
         // ObjectInputStream uses the last defined user-defined class loader in the stack
@@ -204,15 +242,17 @@ object CheckpointReader extends Logging {
         ois.close()
         fs.close()
         cp.validate()
-        logInfo("Checkpoint successfully loaded from file '" + file + "'")
+        logInfo("Checkpoint successfully loaded from file " + file)
         logInfo("Checkpoint was generated at time " + cp.checkpointTime)
         return Some(cp)
       } catch {
         case e: Exception =>
-          logWarning("Error reading checkpoint from file '" + file + "'", e)
+          logWarning("Error reading checkpoint from file " + file, e)
       }
     })
-    throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'")
+
+    // If none of checkpoint files could be read, then throw exception
+    throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
   }
 }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 38aa11923902592f628325cf804b777de65dfb6a..1f0f31c4b17aa47e3db82f6fd6c3d5ea20faf891 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -113,6 +113,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
     val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
     logInfo("Finding new files took " + timeTaken + " ms")
+    logDebug("# cached file times = " + fileModTimes.size)
     if (timeTaken > slideDuration.milliseconds) {
       logWarning(
         "Time taken to find new files exceeds the batch size. " +