Skip to content
Snippets Groups Projects
Commit c0bb38e8 authored by Tathagata Das's avatar Tathagata Das
Browse files

Improved file input stream further.

parent b93f9d42
No related branches found
No related tags found
No related merge requests found
...@@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration ...@@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration
/** /**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] * sequence of RDDs (of the same type) representing a continuous stream of data (see
* for more details on RDDs). DStreams can either be created from live data (such as, data from * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated * While a Spark Streaming program is running, each DStream periodically generates a RDD,
* by a parent DStream. * either from live data or by transforming the RDD generated by a parent DStream.
* *
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through * `join`. These operations are automatically available on any DStream of pairs
* implicit conversions when `spark.streaming.StreamingContext._` is imported. * (e.g., DStream[(Int, Int)] through implicit conversions when
* `org.apache.spark.streaming.StreamingContext._` is imported.
* *
* DStreams internally is characterized by a few basic properties: * DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on * - A list of other DStreams that the DStream depends on
...@@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] ( ...@@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
// Set the minimum value of the rememberDuration if not already set // Set the minimum value of the rememberDuration if not already set
var minRememberDuration = slideDuration var minRememberDuration = slideDuration
if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
minRememberDuration = checkpointDuration * 2
} }
if (rememberDuration == null || rememberDuration < minRememberDuration) { if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration rememberDuration = minRememberDuration
...@@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] ( ...@@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) { if (!isInitialized) {
throw new Exception (this + " has not been initialized") throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false false
} else { } else {
logDebug("Time " + time + " is valid") logDebug("Time " + time + " is valid")
...@@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] ( ...@@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
case Some(newRDD) => case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) { if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel) newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
} }
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint() newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
} }
generatedRDDs.put(time, newRDD) generatedRDDs.put(time, newRDD)
Some(newRDD) Some(newRDD)
...@@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] ( ...@@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
} }
} }
} else { } else {
throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") throw new java.io.NotSerializableException(
"Graph is unexpectedly null when DStream is being serialized.")
} }
} }
...@@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] ( ...@@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (
/** /**
* Return a new DStream in which each RDD has a single element generated by counting the number * Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with * of elements in a sliding window over this DStream. Hash partitioning is used to generate
* Spark's default number of partitions. * the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
...@@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] ( ...@@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
*/ */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
} }
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
} }
val alignedToTime = toTime.floor(slideDuration) val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration) val alignedFromTime = fromTime.floor(slideDuration)
......
...@@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Latest file mod time seen till any point of time // files found in the last interval
private val prevModTimeFiles = new HashSet[String]() private val lastFoundFiles = new HashSet[String]
private var prevModTime = 0L
// Files with mod time earlier than this is ignored. This is updated every interval
// such that in the current interval, files older than any file found in the
// previous interval will be ignored. Obviously this time keeps moving forward.
private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
// Latest file mod time seen till any point of time
@transient private var path_ : Path = null @transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null @transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]] @transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true) @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L @transient private var lastNewFileFindingTime = 0L
override def start() { override def start() { }
if (newFilesOnly) {
prevModTime = graph.zeroTime.milliseconds
} else {
prevModTime = 0
}
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def stop() { } override def stop() { }
...@@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call. * the previous call.
*/ */
override def compute(validTime: Time): Option[RDD[(K, V)]] = { override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= prevModTime, assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]") "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
// Find new files // Find new files
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) { if (!newFiles.isEmpty) {
// Update the modification time and the files processed for that modification time lastFoundFiles.clear()
if (prevModTime < latestModTime) { lastFoundFiles ++= newFiles
prevModTime = latestModTime ignoreTime = minNewFileModTime
prevModTimeFiles.clear()
}
prevModTimeFiles ++= latestModTimeFiles
logDebug("Last mod time updated to " + prevModTime)
} }
files += ((validTime, newFiles.toArray)) files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles)) Some(filesToRDD(newFiles))
...@@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Clear the old time-to-files mappings along with old RDDs */ /** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) { protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time) super.clearMetadata(time)
val oldFiles = files.filter(_._1 <= (time - rememberDuration)) val oldFiles = files.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " + logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
...@@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* Find files which have modification timestamp <= current time and return a 3-tuple of * Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time) * (new files found, latest modification time among them, files with latest modification time)
*/ */
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
logDebug("Trying to get new files for time " + currentTime) logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime) val filter = new CustomPathFilter(currentTime)
...@@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory." "files in the monitored directory."
) )
} }
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) (newFiles, filter.minNewFileModTime)
} }
/** Generate one RDD from an array of files */ /** Generate one RDD from an array of files */
...@@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
} }
/** /**
* Custom PathFilter class to find new files that have modification timestamps <= current time, * Custom PathFilter class to find new files that
* but have not been seen before (i.e. the file should not be in lastModTimeFiles) * ... have modification time more than ignore time
* ... have not been seen in the last interval
* ... have modification time less than maxModTime
*/ */
private[streaming] private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter { class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L // Minimum of the mod times of new files found in the current interval
val latestModTimeFiles = new HashSet[String]() var minNewFileModTime = -1L
def accept(path: Path): Boolean = { def accept(path: Path): Boolean = {
try { try {
if (!filter(path)) { // Reject file if it does not satisfy filter if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path) logDebug("Rejected by filter " + path)
return false return false
} }
// Reject file if it was found in the last interval
if (lastFoundFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false
}
val modTime = getFileModTime(path) val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime) logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) { if (modTime < ignoreTime) {
logDebug("Mod time less than last mod time") // Reject file if it was created before the ignore time (or, before last interval)
return false // If the file was created before the last time it was called logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { return false
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
} else if (modTime > maxModTime) { } else if (modTime > maxModTime) {
// Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ") logDebug("Mod time more than ")
return false // If the file is too new that considering it may give errors return false
} }
if (modTime > latestModTime) { if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
latestModTime = modTime minNewFileModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
} }
latestModTimeFiles += path.toString
logDebug("Accepted " + path) logDebug("Accepted " + path)
} catch { } catch {
case fnfe: java.io.FileNotFoundException => case fnfe: java.io.FileNotFoundException =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment