From eca570f66a3d93bc1745a9fcf8410ad0a9db3e64 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sun, 7 Oct 2012 00:17:59 -0700
Subject: [PATCH] Removed the need to sleep in tests due to waiting for Akka to
 shut down

---
 bagel/src/test/scala/bagel/BagelSuite.scala      |  2 ++
 core/src/main/scala/spark/SparkEnv.scala         |  6 ++----
 core/src/test/scala/spark/AccumulatorSuite.scala |  9 ++++++++-
 core/src/test/scala/spark/BroadcastSuite.scala   |  2 ++
 core/src/test/scala/spark/DistributedSuite.scala |  6 ++++++
 core/src/test/scala/spark/FailureSuite.scala     |  2 ++
 core/src/test/scala/spark/FileServerSuite.scala  |  2 ++
 core/src/test/scala/spark/FileSuite.scala        |  2 ++
 .../src/test/scala/spark/PartitioningSuite.scala |  2 ++
 core/src/test/scala/spark/PipedRDDSuite.scala    |  2 ++
 core/src/test/scala/spark/RDDSuite.scala         |  2 ++
 core/src/test/scala/spark/ShuffleSuite.scala     |  2 ++
 .../test/scala/spark/SizeEstimatorSuite.scala    |  5 +++--
 core/src/test/scala/spark/SortingSuite.scala     |  3 +++
 core/src/test/scala/spark/ThreadingSuite.scala   |  2 ++
 .../scala/spark/storage/BlockManagerSuite.scala  | 16 ----------------
 repl/src/test/scala/spark/repl/ReplSuite.scala   |  2 ++
 17 files changed, 44 insertions(+), 23 deletions(-)

diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3da7152a09..ca59f46843 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -22,6 +22,8 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("halting by voting") {
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 46b1574a61..6a006e0697 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -44,11 +44,9 @@ class SparkEnv (
     blockManager.stop()
     blockManager.master.stop()
     actorSystem.shutdown()
-    // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
-    Thread.sleep(100)
+    // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
+    // down, but let's call it anyway in case it gets fixed in a later release
     actorSystem.awaitTermination()
-    // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
-    Thread.sleep(100)
   }
 }
 
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index b43730468e..d8be99dde7 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.master.port")
   }
 
@@ -54,6 +55,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       }
       sc.stop()
       sc = null
+      // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+      System.clearProperty("spark.master.port")
     }
   }
 
@@ -85,6 +88,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       } should produce [SparkException]
       sc.stop()
       sc = null
+      // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+      System.clearProperty("spark.master.port")
     }
   }
 
@@ -111,7 +116,9 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
         mapAcc.value should contain (i -> i.toString)
       }
       sc.stop()
-      sc = null      
+      sc = null
+      // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+      System.clearProperty("spark.master.port")
     }
   }
 
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 0738a2725b..2d3302f0aa 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("basic broadcast") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 97cfa9dad1..433d2fdc19 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -27,21 +27,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       sc = null
     }
     System.clearProperty("spark.reducer.maxMbInFlight")
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
 
   test("local-cluster format") {
     sc = new SparkContext("local-cluster[2,1,512]", "test")
     assert(sc.parallelize(1 to 2, 2).count() == 2)
     sc.stop()
+    System.clearProperty("spark.master.port")
     sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
     assert(sc.parallelize(1 to 2, 2).count() == 2)
     sc.stop()
+    System.clearProperty("spark.master.port")
     sc = new SparkContext("local-cluster[2, 1, 512]", "test")
     assert(sc.parallelize(1 to 2, 2).count() == 2)
     sc.stop()
+    System.clearProperty("spark.master.port")
     sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
     assert(sc.parallelize(1 to 2, 2).count() == 2)
     sc.stop()
+    System.clearProperty("spark.master.port")
     sc = null
   }
 
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 4405829161..a3454f25f6 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   // Run a 3-task map job in which task 1 deterministically fails once, and check
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index fd7a7bd589..a25b61dcbd 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -31,6 +31,8 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
     if (tmpFile.exists) {
       tmpFile.delete()
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("Distributing files locally") {
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 5c1577ed0b..554bea53a9 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("text files") {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 5000fa7307..3dadc7acec 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index add5221e30..9b84b29227 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("basic pipe") {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 95e402627c..37a0ff0947 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -16,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("basic operations") {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 068607824b..7f8ec5d48f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -24,6 +24,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
 
   test("groupByKey") {
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index 7677ac6db5..17f366212b 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -20,8 +20,9 @@ class DummyClass4(val d: DummyClass3) {
   val x: Int = 0
 }
 
-class SizeEstimatorSuite extends FunSuite
-    with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+class SizeEstimatorSuite
+  extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers {
+
   var oldArch: String = _
   var oldOops: String = _
 
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index c87595ecb3..1ad11ff4c3 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -12,7 +12,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
   after {
     if (sc != null) {
       sc.stop()
+      sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   test("sortByKey") {
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index 302f731187..e9b1837d89 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter {
       sc.stop()
       sc = null
     }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
   }
   
   
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index f61fd45ed3..213a423fef 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -88,14 +88,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
-    Thread.sleep(100)
     assert(store.getSingle("a1") === None, "a1 was in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") != None, "a1 was not in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
-    Thread.sleep(100)
     assert(store.getSingle("a3") === None, "a3 was in store")
   }
   
@@ -107,14 +105,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
-    Thread.sleep(100)
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
     assert(store.getSingle("a1") === None, "a1 was in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    Thread.sleep(100)
     assert(store.getSingle("a1") != None, "a1 was not in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") === None, "a3 was in store")
@@ -128,7 +124,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
-    Thread.sleep(100)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
     // from the same RDD
     assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
@@ -145,7 +140,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    Thread.sleep(100)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
@@ -155,7 +149,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     // Put in more partitions from RDD 0; they should replace rdd_1_1
     store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    Thread.sleep(100)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
@@ -186,7 +179,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
-    Thread.sleep(100)
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
     assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -202,7 +194,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
-    Thread.sleep(100)
     assert(store.getLocalBytes("a2") != None, "a2 was not in store")
     assert(store.getLocalBytes("a3") != None, "a3 was not in store")
     assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -218,7 +209,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
-    Thread.sleep(100)
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
     assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -234,7 +224,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
-    Thread.sleep(100)
     assert(store.getLocalBytes("a2") != None, "a2 was not in store")
     assert(store.getLocalBytes("a3") != None, "a3 was not in store")
     assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
@@ -261,7 +250,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.getSingle("a3") != None, "a1 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
     store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
-    Thread.sleep(100)
     assert(store.getSingle("a1") == None, "a1 was in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
@@ -276,7 +264,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
     store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
-    Thread.sleep(100)
     assert(store.get("list2") != None, "list2 was not in store")
     assert(store.get("list2").get.size == 2)
     assert(store.get("list3") != None, "list3 was not in store")
@@ -286,7 +273,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list2").get.size == 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
-    Thread.sleep(100)
     assert(store.get("list1") != None, "list1 was not in store")
     assert(store.get("list1").get.size == 2)
     assert(store.get("list2") != None, "list2 was not in store")
@@ -304,7 +290,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
     store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
-    Thread.sleep(100)
     // At this point LRU should not kick in because list3 is only on disk
     assert(store.get("list1") != None, "list2 was not in store")
     assert(store.get("list1").get.size === 2)
@@ -320,7 +305,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list3").get.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
     store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
-    Thread.sleep(100)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2") != None, "list3 was not in store")
     assert(store.get("list2").get.size === 2)
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 0b5d439ca4..db78d06d4f 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -30,6 +30,8 @@ class ReplSuite extends FunSuite {
     spark.repl.Main.interp = null
     if (interp.sparkContext != null)
       interp.sparkContext.stop()
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
     return out.toString
   }
   
-- 
GitLab