Skip to content
Snippets Groups Projects
Commit be647191 authored by Tathagata Das's avatar Tathagata Das
Browse files

Changed file stream to not catch any exceptions related to finding new files...

Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored).
parent bacc65cf
No related branches found
No related tags found
No related merge requests found
......@@ -17,18 +17,17 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
......@@ -106,17 +105,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
try {
logDebug("Trying to get new files for time " + currentTime)
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
case e: Exception =>
logError("Attempt to get new files failed", e)
reset()
}
(Seq.empty, -1, Seq.empty)
logDebug("Trying to get new files for time " + currentTime)
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
}
/** Generate one RDD from an array of files */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment