diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 3da7152a093fe74f7776371f0efbd3d73e88c153..ca59f46843798a5286171f0a35cb54923d972518 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -22,6 +22,8 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("halting by voting") { diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 46b1574a6137703354425d50dcd844b3db1ee5b3..6a006e0697a78ea6e2da62b414fcd24f7194ec13 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -44,11 +44,9 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() actorSystem.shutdown() - // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit - Thread.sleep(100) + // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut + // down, but let's call it anyway in case it gets fixed in a later release actorSystem.awaitTermination() - // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit - Thread.sleep(100) } } diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index b43730468e7ef09941ba6a744bbd40c723f71aec..d8be99dde71f83d2805c94ae86b62160121b1ed9 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } @@ -54,6 +55,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } @@ -85,6 +88,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } should produce [SparkException] sc.stop() sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } @@ -111,7 +116,9 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter mapAcc.value should contain (i -> i.toString) } sc.stop() - sc = null + sc = null + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } } diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala index 0738a2725b417d4ab40af34449f37fe7276040c3..2d3302f0aa2288a7f9970d616855ae1ece810401 100644 --- a/core/src/test/scala/spark/BroadcastSuite.scala +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -12,6 +12,8 @@ class BroadcastSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic broadcast") { diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 97cfa9dad1d82b1fcf6d1206c6cc5b1a72353507..433d2fdc19bf14d7486c21bd836b498cb4bee9c0 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -27,21 +27,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter sc = null } System.clearProperty("spark.reducer.maxMbInFlight") + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[2 , 1 , 512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[2, 1, 512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2) sc.stop() + System.clearProperty("spark.master.port") sc = null } diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index 440582916111835bcfbc6f0635c68901202934a4..a3454f25f6f811ba63ada0460bc1ec709c6fa1b6 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -32,6 +32,8 @@ class FailureSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } // Run a 3-task map job in which task 1 deterministically fails once, and check diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index fd7a7bd589430e7f37d7e88e436d300eef82071c..a25b61dcbd6dab3d86139d510f4b3952a3265c9c 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -31,6 +31,8 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { if (tmpFile.exists) { tmpFile.delete() } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("Distributing files locally") { diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index 5c1577ed0bab8935d0ca12505446ac73143a4f95..554bea53a9181a1fa6c8dad13f9eec94a6be8dc9 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -20,6 +20,8 @@ class FileSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("text files") { diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index 5000fa7307aa3a7ccb0d193721241c6465902b5b..3dadc7acec1771b034ca8ff6ae7b1d8657149866 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -16,6 +16,8 @@ class PartitioningSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index add5221e305b1ce9e40be2003adaf11ea72c86d7..9b84b2922793352091ef5a6c91913f462f38ca48 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -13,6 +13,8 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic pipe") { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 95e402627c6659ad4db6bbca2edaf9b40eb81230..37a0ff09477b2c0dfc5909c3d7a0b41975be0eff 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -16,6 +16,8 @@ class RDDSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("basic operations") { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 068607824b53389dc0dcf1deff81aca78af111a9..7f8ec5d48fa664bbbf31e7dfa943d5c1461db01f 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -24,6 +24,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("groupByKey") { diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index 7677ac6db58431ca70485656e0e16aaee9ea6aa1..17f366212b7829ff04946fbe6ab6bff10212ff00 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -20,8 +20,9 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } -class SizeEstimatorSuite extends FunSuite - with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers { +class SizeEstimatorSuite + extends FunSuite with BeforeAndAfterAll with PrivateMethodTester with ShouldMatchers { + var oldArch: String = _ var oldOops: String = _ diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index c87595ecb350527b96b641ded68a6829a22f444e..1ad11ff4c3df7b70cb7eb054b9ffdfc888b5c003 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -12,7 +12,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with after { if (sc != null) { sc.stop() + sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } test("sortByKey") { diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala index 302f73118786dda09463d5db838b1ad5a2100815..e9b1837d894040f162802eeab4786efd5f5063ac 100644 --- a/core/src/test/scala/spark/ThreadingSuite.scala +++ b/core/src/test/scala/spark/ThreadingSuite.scala @@ -31,6 +31,8 @@ class ThreadingSuite extends FunSuite with BeforeAndAfter { sc.stop() sc = null } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index f61fd45ed35ec031ed0c78423075a458b300ca89..213a423fefaa45c664337a670cf4cd2e7c878e11 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -88,14 +88,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") - Thread.sleep(100) assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - Thread.sleep(100) assert(store.getSingle("a3") === None, "a3 was in store") } @@ -107,14 +105,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) - Thread.sleep(100) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") === None, "a3 was in store") @@ -128,7 +124,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 // from the same RDD assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") @@ -145,7 +140,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // At this point rdd_1_1 should've replaced rdd_0_1 assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") @@ -155,7 +149,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - Thread.sleep(100) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") @@ -186,7 +179,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -202,7 +194,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - Thread.sleep(100) assert(store.getLocalBytes("a2") != None, "a2 was not in store") assert(store.getLocalBytes("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -218,7 +209,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -234,7 +224,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getLocalBytes("a2") != None, "a2 was not in store") assert(store.getLocalBytes("a3") != None, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") @@ -261,7 +250,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a3") != None, "a1 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.getSingle("a1") == None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") @@ -276,7 +264,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") != None, "list3 was not in store") @@ -286,7 +273,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) - Thread.sleep(100) assert(store.get("list1") != None, "list1 was not in store") assert(store.get("list1").get.size == 2) assert(store.get("list2") != None, "list2 was not in store") @@ -304,7 +290,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) - Thread.sleep(100) // At this point LRU should not kick in because list3 is only on disk assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1").get.size === 2) @@ -320,7 +305,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER) - Thread.sleep(100) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2").get.size === 2) diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 0b5d439ca4c985de66fb41714f5cc79042d3c305..db78d06d4f0b68cb4731685c3eb2c84f3f7470ca 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -30,6 +30,8 @@ class ReplSuite extends FunSuite { spark.repl.Main.interp = null if (interp.sparkContext != null) interp.sparkContext.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") return out.toString }