Skip to content
Snippets Groups Projects
Commit bacc65cf authored by Tathagata Das's avatar Tathagata Das
Browse files

Removed slack time in file stream and added better handling of exceptions due...

Removed slack time in file stream and added better handling of exceptions due to failures due FileNotFound exceptions.
parent d4dfab50
No related branches found
No related tags found
No related merge requests found
......@@ -40,9 +40,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Max attempts to try if listing files fail
val MAX_ATTEMPTS = 10
// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
......@@ -109,19 +106,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime)
var attempts = 0
while (attempts < MAX_ATTEMPTS) {
attempts += 1
try {
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
case ioe: IOException =>
logWarning("Attempt " + attempts + " to get new files failed", ioe)
reset()
}
try {
logDebug("Trying to get new files for time " + currentTime)
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
case e: Exception =>
logError("Attempt to get new files failed", e)
reset()
}
(Seq.empty, -1, Seq.empty)
}
......@@ -193,22 +186,17 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* been seen before (i.e. the file should not be in lastModTimeFiles)
*/
private[streaming]
class CustomPathFilter(currentTime: Long) extends PathFilter {
class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
// Creating an RDD from a HDFS file immediately after the file is created sometime returns
// an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older
// than slack time from current time is considered for processing.
val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong
val maxModTime = currentTime - slackTime
def accept(path: Path): Boolean = {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
} else { // Accept file only if
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
val modTime = fs.getFileStatus(path).getModificationTime()
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
......@@ -228,8 +216,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
} catch {
case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
}
return true
}
}
}
......@@ -237,14 +230,4 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
// Disable slack time (i.e. set it to zero)
private[streaming] def disableSlackTime() {
System.setProperty("spark.streaming.fileStream.slackTime", "0")
}
// Restore default value of slack time
private[streaming] def restoreSlackTime() {
System.clearProperty("spark.streaming.fileStream.slackTime")
}
}
......@@ -200,9 +200,6 @@ class CheckpointSuite extends TestSuiteBase {
val clockProperty = System.getProperty("spark.streaming.clock")
System.clearProperty("spark.streaming.clock")
// Disable slack time of file stream when testing with local file system
FileInputDStream.disableSlackTime()
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
......@@ -303,9 +300,6 @@ class CheckpointSuite extends TestSuiteBase {
// Enable manual clock back again for other tests
if (clockProperty != null)
System.setProperty("spark.streaming.clock", clockProperty)
// Restore the default slack time
FileInputDStream.restoreSlackTime()
}
......
......@@ -152,9 +152,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Disable manual clock as FileInputDStream does not work with manual clock
System.clearProperty("spark.streaming.clock")
// Disable slack time of file stream when testing with local file system
FileInputDStream.disableSlackTime()
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
......@@ -199,9 +196,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enable manual clock back again for other tests
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
// Restore the default slack time
FileInputDStream.restoreSlackTime()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment