Skip to content
Snippets Groups Projects
Commit cb0e9b09 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files...

[SPARK-4518][SPARK-4519][Streaming] Refactored file stream to prevent files from being processed multiple times

Because of a corner case, a file already selected for batch t can get considered again for batch t+2. This refactoring fixes it by remembering all the files selected in the last 1 minute, so that this corner case does not arise. Also uses spark context's hadoop configuration to access the file system API for listing directories.

pwendell Please take look. I still have not run long-running integration tests, so I cannot say for sure whether this has indeed solved the issue. You could do a first pass on this in the meantime.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3419 from tdas/filestream-fix2 and squashes the following commits:

c19dd8a [Tathagata Das] Addressed PR comments.
513b608 [Tathagata Das] Updated docs.
d364faf [Tathagata Das] Added the current time condition back
5526222 [Tathagata Das] Removed unnecessary imports.
38bb736 [Tathagata Das] Fix long line.
203bbc7 [Tathagata Das] Un-ignore tests.
eaef4e1 [Tathagata Das] Fixed SPARK-4519
9dbd40a [Tathagata Das] Refactored FileInputDStream to remember last few batches.
parent 4a90276a
No related branches found
No related tags found
No related merge requests found
...@@ -254,7 +254,7 @@ abstract class DStream[T: ClassTag] ( ...@@ -254,7 +254,7 @@ abstract class DStream[T: ClassTag] (
} }
private[streaming] def remember(duration: Duration) { private[streaming] def remember(duration: Duration) {
if (duration != null && duration > rememberDuration) { if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
rememberDuration = duration rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
} }
......
...@@ -17,18 +17,55 @@ ...@@ -17,18 +17,55 @@
package org.apache.spark.streaming.dstream package org.apache.spark.streaming.dstream
import java.io.{ObjectInputStream, IOException} import java.io.{IOException, ObjectInputStream}
import scala.collection.mutable.{HashSet, HashMap}
import scala.collection.mutable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.util.{TimeStampedHashMap, Utils}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.util.{TimeStampedHashMap, Utils}
/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
* files and creates a stream out of them. The way it works as follows.
*
* At each batch interval, the file system is queried for files in the given directory and
* detected new files are selected for that batch. In this case "new" means files that
* became visible to readers during that time period. Some extra care is needed to deal
* with the fact that files may become visible after they are created. For this purpose, this
* class remembers the information about the files selected in past batches for
* a certain duration (say, "remember window") as shown in the figure below.
*
* |<----- remember window ----->|
* ignore threshold --->| |<--- current batch time
* |____.____.____.____.____.____|
* | | | | | | |
* ---------------------|----|----|----|----|----|----|-----------------------> Time
* |____|____|____|____|____|____|
* remembered batches
*
* The trailing end of the window is the "ignore threshold" and all files whose mod times
* are less than this threshold are assumed to have already been selected and are therefore
* ignored. Files whose mod times are within the "remember window" are checked against files
* that have already been selected. At a high level, this is how new files are identified in
* each batch - files whose mod times are greater than the ignore threshold and
* have not been considered within the remember window. See the documentation on the method
* `isNewFile` for more details.
*
* This makes some assumptions from the underlying file system that the system is monitoring.
* - The clock of the file system is assumed to synchronized with the clock of the machine running
* the streaming app.
* - If a file is to be visible in the directory listings, it must be visible within a certain
* duration of the mod time of the file. This duration is the "remember window", which is set to
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
* selected as the mod time will be less than the ignore threshold when it becomes visible.
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
* processing semantics are undefined.
*/
private[streaming] private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext, @transient ssc_ : StreamingContext,
...@@ -37,22 +74,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -37,22 +74,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true) newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) { extends InputDStream[(K, V)](ssc_) {
// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// files found in the last interval // Initial ignore threshold based on which old, existing files in the directory (at the time of
private val lastFoundFiles = new HashSet[String] // starting the streaming application) will be ignored or considered
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
/*
* Make sure that the information of files selected in the last few batches are remembered.
* This would allow us to filter away not-too-old files which have already been recently
* selected and processed.
*/
private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
private val durationToRemember = slideDuration * numBatchesToRemember
remember(durationToRemember)
// Files with mod time earlier than this is ignored. This is updated every interval // Map of batch-time to selected file info for the remembered batches
// such that in the current interval, files older than any file found in the @transient private[streaming] var batchTimeToSelectedFiles =
// previous interval will be ignored. Obviously this time keeps moving forward. new mutable.HashMap[Time, Array[String]]
private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
// Latest file mod time seen till any point of time
@transient private var path_ : Path = null @transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null @transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L
override def start() { } override def start() { }
...@@ -68,54 +120,113 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -68,54 +120,113 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call. * the previous call.
*/ */
override def compute(validTime: Time): Option[RDD[(K, V)]] = { override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")
// Find new files // Find new files
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (!newFiles.isEmpty) { batchTimeToSelectedFiles += ((validTime, newFiles))
lastFoundFiles.clear() recentlySelectedFiles ++= newFiles
lastFoundFiles ++= newFiles
ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles)) Some(filesToRDD(newFiles))
} }
/** Clear the old time-to-files mappings along with old RDDs */ /** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) { protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time) super.clearMetadata(time)
val oldFiles = files.filter(_._1 < (time - rememberDuration)) val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys batchTimeToSelectedFiles --= oldFiles.keys
recentlySelectedFiles --= oldFiles.values.flatten
logInfo("Cleared " + oldFiles.size + " old files that were older than " + logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" + logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
// Delete file mod times that weren't accessed in the last round of getting new files // Delete file mod times that weren't accessed in the last round of getting new files
fileModTimes.clearOldValues(lastNewFileFindingTime - 1) fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
} }
/** /**
* Find files which have modification timestamp <= current time and return a 3-tuple of * Find new files for the batch of `currentTime`. This is done by first calculating the
* (new files found, latest modification time among them, files with latest modification time) * ignore threshold for file mod times, and then getting a list of files filtered based on
* the current batch time and the ignore threshold. The ignore threshold is the max of
* initial ignore threshold and the trailing end of the remember window (that is, which ever
* is later in time).
*/ */
private def findNewFiles(currentTime: Long): (Seq[String], Long) = { private def findNewFiles(currentTime: Long): Array[String] = {
logDebug("Trying to get new files for time " + currentTime) try {
lastNewFileFindingTime = System.currentTimeMillis lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) // Calculate ignore threshold
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime val modTimeIgnoreThreshold = math.max(
logInfo("Finding new files took " + timeTaken + " ms") initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
logDebug("# cached file times = " + fileModTimes.size) currentTime - durationToRemember.milliseconds // trailing end of the remember window
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reduceing the number of " +
"files in the monitored directory."
) )
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
)
}
newFiles
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}
}
/**
* Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be
* accepted, it has to pass the following criteria.
* - It must pass the user-provided file filter.
* - It must be newer than the ignore threshold. It is assumed that files older than the ignore
* threshold have already been considered or are existing files before start
* (when newFileOnly = true).
* - It must not be present in the recently selected files that this class remembers.
* - It must not be newer than the time of the batch (i.e. `currentTime` for which this
* file is being tested. This can occur if the driver was recovered, and the missing batches
* (during downtime) are being generated. In that case, a batch of time T may be generated
* at time T+x. Say x = 5. If that batch T contains file of mod time T+5, then bad things can
* happen. Let's say the selected files are remembered for 60 seconds. At time t+61,
* the batch of time t is forgotten, and the ignore threshold is still T+1.
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
return false
} }
(newFiles, filter.minNewFileModTime) // Reject file if mod time > current batch time
if (modTime > currentTime) {
logDebug(s"$pathStr not selected as mod time $modTime > current time $currentTime")
return false
}
// Reject file if it was considered earlier
if (recentlySelectedFiles.contains(pathStr)) {
logDebug(s"$pathStr already considered")
return false
}
logDebug(s"$pathStr accepted with mod time $modTime")
return true
} }
/** Generate one RDD from an array of files */ /** Generate one RDD from an array of files */
...@@ -132,21 +243,21 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -132,21 +243,21 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
new UnionRDD(context.sparkContext, fileRDDs) new UnionRDD(context.sparkContext, fileRDDs)
} }
/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}
private def directoryPath: Path = { private def directoryPath: Path = {
if (path_ == null) path_ = new Path(directory) if (path_ == null) path_ = new Path(directory)
path_ path_
} }
private def fs: FileSystem = { private def fs: FileSystem = {
if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
fs_ fs_
} }
private def getFileModTime(path: Path) = {
// Get file mod time from cache or fetch it from the file system
fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}
private def reset() { private def reset() {
fs_ = null fs_ = null
} }
...@@ -155,9 +266,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -155,9 +266,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used") logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject() ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[(K,V)]] () generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
files = new HashMap[Time, Array[String]] batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
fileModTimes = new TimeStampedHashMap[String, Long](true) recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
} }
/** /**
...@@ -167,11 +279,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -167,11 +279,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming] private[streaming]
class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
override def update(time: Time) { override def update(time: Time) {
hadoopFiles.clear() hadoopFiles.clear()
hadoopFiles ++= files hadoopFiles ++= batchTimeToSelectedFiles
} }
override def cleanup(time: Time) { } override def cleanup(time: Time) { }
...@@ -182,7 +294,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -182,7 +294,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Restore the metadata in both files and generatedRDDs // Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " + logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") ) f.mkString("[", ", ", "]") )
files += ((t, f)) batchTimeToSelectedFiles += ((t, f))
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f))) generatedRDDs += ((t, filesToRDD(f)))
} }
} }
...@@ -193,57 +306,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -193,57 +306,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
} }
} }
}
private[streaming]
object FileInputDStream {
/** /**
* Custom PathFilter class to find new files that * Minimum duration of remembering the information of selected files. Files with mod times
* ... have modification time more than ignore time * older than this "window" of remembering will be ignored. So if new files are visible
* ... have not been seen in the last interval * within this window, then the file will get selected in the next batch.
* ... have modification time less than maxModTime
*/ */
private[streaming] private val MIN_REMEMBER_DURATION = Minutes(1)
class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Minimum of the mod times of new files found in the current interval def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
var minNewFileModTime = -1L
def accept(path: Path): Boolean = { /**
try { * Calculate the number of last batches to remember, such that all the files selected in
if (!filter(path)) { // Reject file if it does not satisfy filter * at least last MIN_REMEMBER_DURATION duration can be remembered.
logDebug("Rejected by filter " + path) */
return false def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
} math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
// Reject file if it was found in the last interval
if (lastFoundFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false
}
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < ignoreTime) {
// Reject file if it was created before the ignore time (or, before last interval)
logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
return false
} else if (modTime > maxModTime) {
// Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ")
return false
}
if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
minNewFileModTime = modTime
}
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
}
true
}
} }
} }
private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
...@@ -265,7 +265,7 @@ class CheckpointSuite extends TestSuiteBase { ...@@ -265,7 +265,7 @@ class CheckpointSuite extends TestSuiteBase {
// Verify whether files created have been recorded correctly or not // Verify whether files created have been recorded correctly or not
var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
def recordedFiles = fileInputDStream.files.values.flatMap(x => x) def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
......
...@@ -28,9 +28,12 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} ...@@ -28,9 +28,12 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files import com.google.common.io.Files
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
...@@ -38,6 +41,9 @@ import org.apache.spark.streaming.util.ManualClock ...@@ -38,6 +41,9 @@ import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
...@@ -91,54 +97,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -91,54 +97,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
} }
test("file input stream") { test("file input stream - newFilesOnly = true") {
// Disable manual clock as FileInputDStream does not work with manual clock testFileStream(newFilesOnly = true)
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") }
// Set up the streaming context and input streams
val testDir = Utils.createTempDir()
val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(fileStream, outputBuffer)
outputStream.register()
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
val file = new File(testDir, i.toString)
Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)
Utils.deleteRecursively(testDir)
// Enable manual clock back again for other tests test("file input stream - newFilesOnly = false") {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") testFileStream(newFilesOnly = false)
} }
test("multi-thread receiver") { test("multi-thread receiver") {
...@@ -180,7 +144,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -180,7 +144,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output.sum === numTotalRecords) assert(output.sum === numTotalRecords)
} }
test("queue input stream - oneAtATime=true") { test("queue input stream - oneAtATime = true") {
// Set up the streaming context and input streams // Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]() val queue = new SynchronizedQueue[RDD[String]]()
...@@ -223,7 +187,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -223,7 +187,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
} }
} }
test("queue input stream - oneAtATime=false") { test("queue input stream - oneAtATime = false") {
// Set up the streaming context and input streams // Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
val queue = new SynchronizedQueue[RDD[String]]() val queue = new SynchronizedQueue[RDD[String]]()
...@@ -268,6 +232,50 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -268,6 +232,50 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output(i) === expectedOutput(i)) assert(output(i) === expectedOutput(i))
} }
} }
def testFileStream(newFilesOnly: Boolean) {
var ssc: StreamingContext = null
val testDir: File = null
try {
val testDir = Utils.createTempDir()
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
Thread.sleep(1000)
// Set up the streaming context and input streams
val newConf = conf.clone.set(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
ssc = new StreamingContext(newConf, batchDuration)
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputBuffer)
outputStream.register()
ssc.start()
// Create files in the directory
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
}
// Verify that all the files have been read
val expectedOutput = if (newFilesOnly) {
input.map(_.toString).toSet
} else {
(Seq(0) ++ input).map(_.toString).toSet
}
eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
assert(outputBuffer.flatten.toSet === expectedOutput)
}
} finally {
if (ssc != null) ssc.stop()
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment