Skip to content
Snippets Groups Projects
Commit ddcb976b authored by Tathagata Das's avatar Tathagata Das
Browse files

Made MasterFailureTest more robust.

parent 4b8402e9
No related branches found
No related tags found
No related merge requests found
...@@ -40,6 +40,8 @@ object MasterFailureTest extends Logging { ...@@ -40,6 +40,8 @@ object MasterFailureTest extends Logging {
println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
testUpdateStateByKey(directory, numBatches, batchDuration) testUpdateStateByKey(directory, numBatches, batchDuration)
println("\n\nSUCCESS\n\n")
} }
def testMap(directory: String, numBatches: Int, batchDuration: Duration) { def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
...@@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) ...@@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
override def run() { override def run() {
val localTestDir = Files.createTempDir() val localTestDir = Files.createTempDir()
val fs = testDir.getFileSystem(new Configuration()) var fs = testDir.getFileSystem(new Configuration())
val maxTries = 3
try { try {
Thread.sleep(5000) // To make sure that all the streaming context has been set up Thread.sleep(5000) // To make sure that all the streaming context has been set up
for (i <- 0 until input.size) { for (i <- 0 until input.size) {
...@@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) ...@@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i+1).toString) val localFile = new File(localTestDir, (i+1).toString)
val hadoopFile = new Path(testDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n") FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
//fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString)) var tries = 0
fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) var done = false
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) while (!done && tries < maxTries) {
tries += 1
try {
fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
done = true
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
}
}
}
if (!done)
logError("Could not generate file " + hadoopFile)
else
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval) Thread.sleep(interval)
localFile.delete() localFile.delete()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment