From ddcb976b0d7ce4a76168da33c0e947a5a6b5a255 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Fri, 15 Feb 2013 06:54:47 +0000
Subject: [PATCH] Made MasterFailureTest more robust.

---
 .../streaming/util/MasterFailureTest.scala    | 26 ++++++++++++++++---
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 83d8591a3a..776e676063 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -40,6 +40,8 @@ object MasterFailureTest extends Logging {
 
     println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
     testUpdateStateByKey(directory, numBatches, batchDuration)
+
+    println("\n\nSUCCESS\n\n")
   }
 
   def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
@@ -347,7 +349,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
 
   override def run() {
     val localTestDir = Files.createTempDir()
-    val fs = testDir.getFileSystem(new Configuration())
+    var fs = testDir.getFileSystem(new Configuration())
+    val maxTries = 3
     try {
       Thread.sleep(5000) // To make sure that all the streaming context has been set up
       for (i <- 0 until input.size) {
@@ -355,9 +358,24 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
         val localFile = new File(localTestDir, (i+1).toString)
         val hadoopFile = new Path(testDir, (i+1).toString)
         FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
-        //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString))
-        fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
-        logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+        var tries = 0
+	var done = false
+        while (!done && tries < maxTries) {
+          tries += 1
+          try {
+            fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+	    done = true
+	  } catch {
+	    case ioe: IOException => { 
+              fs = testDir.getFileSystem(new Configuration()) 
+              logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+	    }
+	  }
+        }
+	if (!done) 
+          logError("Could not generate file " + hadoopFile)
+        else 
+          logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
         Thread.sleep(interval)
         localFile.delete()
       }
-- 
GitLab