Skip to content
Snippets Groups Projects
Commit 33597810 authored by mwws's avatar mwws Committed by Sean Owen
Browse files

[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard

## What changes were proposed in this pull request?
make StreamingContext.textFileStream support wildcard
like /home/user/*/file

## How was this patch tested?
I did manual test and added a new unit test case

Author: mwws <wei.mao@intel.com>
Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com>

Closes #12752 from mwws/SPARK_FileStream.
parent 8beae591
No related branches found
No related tags found
No related merge requests found
......@@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
......
......@@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testFileStream(newFilesOnly = false)
}
test("file input stream - wildcard") {
var testDir: File = null
try {
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
val pathWithWildCard = testDir.toString + "/*/"
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
val batchCounter = new BatchCounter(ssc)
// monitor "testDir/*/"
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
pathWithWildCard).map(_._2.toString)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()
ssc.start()
// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)
def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === data)
}
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment