diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7c4cca7eacab61ade04b4636d89c765b89890b6..9dfcc08abea95a28549dd8d5f8b9f1f27629d685 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see
+ * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
+ * live data (such as, data from  * HDFS, Kafka or Flume) or it can be generated by transformation
+ * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
+ * While a Spark Streaming program is running, each DStream periodically generates a RDD,
+ * either from live data or by transforming the RDD generated by a parent DStream.
  *
  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
- * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
- * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
+ * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
+ * `join`. These operations are automatically available on any DStream of pairs
+ * (e.g., DStream[(Int, Int)] through implicit conversions when
+ * `org.apache.spark.streaming.StreamingContext._` is imported.
  *
  * DStreams internally is characterized by a few basic properties:
  *  - A list of other DStreams that the DStream depends on
@@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
     // Set the minimum value of the rememberDuration if not already set
     var minRememberDuration = slideDuration
     if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
-      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
+      // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
+      minRememberDuration = checkpointDuration * 2
     }
     if (rememberDuration == null || rememberDuration < minRememberDuration) {
       rememberDuration = minRememberDuration
@@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
     if (!isInitialized) {
       throw new Exception (this + " has not been initialized")
     } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
-      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
+        " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
       false
     } else {
       logDebug("Time " + time + " is valid")
@@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
             case Some(newRDD) =>
               if (storageLevel != StorageLevel.NONE) {
                 newRDD.persist(storageLevel)
-                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+                logInfo("Persisting RDD " + newRDD.id + " for time " +
+                  time + " to " + storageLevel + " at time " + time)
               }
-              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+              if (checkpointDuration != null &&
+                (time - zeroTime).isMultipleOf(checkpointDuration)) {
                 newRDD.checkpoint()
-                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+                logInfo("Marking RDD " + newRDD.id + " for time " + time +
+                  " for checkpointing at time " + time)
               }
               generatedRDDs.put(time, newRDD)
               Some(newRDD)
@@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
         }
       }
     } else {
-      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+      throw new java.io.NotSerializableException(
+        "Graph is unexpectedly null when DStream is being serialized.")
     }
   }
 
@@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (
 
   /**
    * Return a new DStream in which each RDD has a single element generated by counting the number
-   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
-   * Spark's default number of partitions.
+   * of elements in a sliding window over this DStream. Hash partitioning is used to generate
+   * the RDDs with Spark's default number of partitions.
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
    */
   def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
     if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+        + slideDuration + ")")
     }
     if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+        + slideDuration + ")")
     }
     val alignedToTime = toTime.floor(slideDuration)
     val alignedFromTime = fromTime.floor(slideDuration)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 37c46b26a50b54bdd1b213b2fb627aa0ea895709..8a6051622e2d5b24c0656791338db62d7a3a1d76 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
 
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
-  // Latest file mod time seen till any point of time
-  private val prevModTimeFiles = new HashSet[String]()
-  private var prevModTime = 0L
+  // files found in the last interval
+  private val lastFoundFiles = new HashSet[String]
+
+  // Files with mod time earlier than this is ignored. This is updated every interval
+  // such that in the current interval, files older than any file found in the
+  // previous interval will be ignored. Obviously this time keeps moving forward.
+  private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
 
+  // Latest file mod time seen till any point of time
   @transient private var path_ : Path = null
   @transient private var fs_ : FileSystem = null
   @transient private[streaming] var files = new HashMap[Time, Array[String]]
   @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
   @transient private var lastNewFileFindingTime = 0L
 
-  override def start() {
-    if (newFilesOnly) {
-      prevModTime = graph.zeroTime.milliseconds
-    } else {
-      prevModTime = 0
-    }
-    logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
-  }
+  override def start() { }
 
   override def stop() { }
 
@@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * the previous call.
    */
   override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-    assert(validTime.milliseconds >= prevModTime,
-      "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
+    assert(validTime.milliseconds >= ignoreTime,
+      "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
 
     // Find new files
-    val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
+    val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
-    if (newFiles.length > 0) {
-      // Update the modification time and the files processed for that modification time
-      if (prevModTime < latestModTime) {
-        prevModTime = latestModTime
-        prevModTimeFiles.clear()
-      }
-      prevModTimeFiles ++= latestModTimeFiles
-      logDebug("Last mod time updated to " + prevModTime)
+    if (!newFiles.isEmpty) {
+      lastFoundFiles.clear()
+      lastFoundFiles ++= newFiles
+      ignoreTime = minNewFileModTime
     }
     files += ((validTime, newFiles.toArray))
     Some(filesToRDD(newFiles))
@@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   /** Clear the old time-to-files mappings along with old RDDs */
   protected[streaming] override def clearMetadata(time: Time) {
     super.clearMetadata(time)
-    val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+    val oldFiles = files.filter(_._1 < (time - rememberDuration))
     files --= oldFiles.keys
     logInfo("Cleared " + oldFiles.size + " old files that were older than " +
       (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
@@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * Find files which have modification timestamp <= current time and return a 3-tuple of
    * (new files found, latest modification time among them, files with latest modification time)
    */
-  private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+  private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
     logDebug("Trying to get new files for time " + currentTime)
     lastNewFileFindingTime = System.currentTimeMillis
     val filter = new CustomPathFilter(currentTime)
@@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           "files in the monitored directory."
       )
     }
-    (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+    (newFiles, filter.minNewFileModTime)
   }
 
   /** Generate one RDD from an array of files */
@@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   }
 
   /**
-   * Custom PathFilter class to find new files that have modification timestamps <= current time,
-   * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
+   * Custom PathFilter class to find new files that
+   * ... have modification time more than ignore time
+   * ... have not been seen in the last interval
+   * ... have modification time less than maxModTime
    */
   private[streaming]
   class CustomPathFilter(maxModTime: Long) extends PathFilter {
-    // Latest file mod time seen in this round of fetching files and its corresponding files
-    var latestModTime = 0L
-    val latestModTimeFiles = new HashSet[String]()
+
+    // Minimum of the mod times of new files found in the current interval
+    var minNewFileModTime = -1L
+
     def accept(path: Path): Boolean = {
       try {
         if (!filter(path)) {  // Reject file if it does not satisfy filter
           logDebug("Rejected by filter " + path)
           return false
         }
+        // Reject file if it was found in the last interval
+        if (lastFoundFiles.contains(path.toString)) {
+          logDebug("Mod time equal to last mod time, but file considered already")
+          return false
+        }
         val modTime = getFileModTime(path)
         logDebug("Mod time for " + path + " is " + modTime)
-        if (modTime < prevModTime) {
-          logDebug("Mod time less than last mod time")
-          return false  // If the file was created before the last time it was called
-        } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
-          logDebug("Mod time equal to last mod time, but file considered already")
-          return false  // If the file was created exactly as lastModTime but not reported yet
+        if (modTime < ignoreTime) {
+          // Reject file if it was created before the ignore time (or, before last interval)
+          logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
+          return false
         } else if (modTime > maxModTime) {
+          // Reject file if it is too new that considering it may give errors
           logDebug("Mod time more than ")
-          return false  // If the file is too new that considering it may give errors
+          return false
         }
-        if (modTime > latestModTime) {
-          latestModTime = modTime
-          latestModTimeFiles.clear()
-          logDebug("Latest mod time updated to " + latestModTime)
+        if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
+          minNewFileModTime = modTime
         }
-        latestModTimeFiles += path.toString
         logDebug("Accepted " + path)
       } catch {
         case fnfe: java.io.FileNotFoundException =>