From 866e6949df8cdba0a4da20a1045333e419ed0e12 Mon Sep 17 00:00:00 2001
From: Denny <dennybritz@gmail.com>
Date: Wed, 18 Jul 2012 13:09:50 -0700
Subject: [PATCH] Always destroy SparkContext in after block for the unit
 tests.

Conflicts:

	core/src/test/scala/spark/ShuffleSuite.scala
---
 bagel/src/test/scala/bagel/BagelSuite.scala   | 17 ++++--
 .../src/test/scala/spark/BroadcastSuite.scala | 18 ++++--
 core/src/test/scala/spark/FailureSuite.scala  | 21 ++++---
 core/src/test/scala/spark/FileSuite.scala     | 42 ++++++-------
 .../scala/spark/KryoSerializerSuite.scala     |  3 +-
 .../test/scala/spark/PartitioningSuite.scala  | 24 +++++---
 core/src/test/scala/spark/PipedRDDSuite.scala | 19 ++++--
 core/src/test/scala/spark/RDDSuite.scala      | 18 ++++--
 core/src/test/scala/spark/ShuffleSuite.scala  | 61 +++++++++----------
 core/src/test/scala/spark/SortingSuite.scala  | 29 +++++----
 .../src/test/scala/spark/ThreadingSuite.scala | 25 +++++---
 11 files changed, 163 insertions(+), 114 deletions(-)

diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 0eda80af64..5ac7f5d381 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -1,6 +1,6 @@
 package spark.bagel
 
-import org.scalatest.{FunSuite, Assertions}
+import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
 import org.scalatest.prop.Checkers
 import org.scalacheck.Arbitrary._
 import org.scalacheck.Gen
@@ -13,9 +13,16 @@ import spark._
 class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
 class TestMessage(val targetId: String) extends Message[String] with Serializable
 
-class BagelSuite extends FunSuite with Assertions {
+class BagelSuite extends FunSuite with Assertions with BeforeAndAfter{
+  
+  var sc: SparkContext = _
+  
+  after{
+    sc.stop()
+  }
+  
   test("halting by voting") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
     val msgs = sc.parallelize(Array[(String, TestMessage)]())
     val numSupersteps = 5
@@ -26,11 +33,10 @@ class BagelSuite extends FunSuite with Assertions {
       }
     for ((id, vert) <- result.collect)
       assert(vert.age === numSupersteps)
-    sc.stop()
   }
 
   test("halting by message silence") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(false, 0))))
     val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
     val numSupersteps = 5
@@ -48,6 +54,5 @@ class BagelSuite extends FunSuite with Assertions {
       }
     for ((id, vert) <- result.collect)
       assert(vert.age === numSupersteps)
-    sc.stop()
   }
 }
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 750703de30..d22c2d4295 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -1,23 +1,31 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 
-class BroadcastSuite extends FunSuite {
+class BroadcastSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("basic broadcast") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val list = List(1, 2, 3, 4)
     val listBroadcast = sc.broadcast(list)
     val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
     assert(results.collect.toSet === Set((1, 10), (2, 10)))
-    sc.stop()
   }
 
   test("broadcast variables accessed in multiple threads") {
-    val sc = new SparkContext("local[10]", "test")
+    sc = new SparkContext("local[10]", "test")
     val list = List(1, 2, 3, 4)
     val listBroadcast = sc.broadcast(list)
     val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
     assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
-    sc.stop()
   }
 }
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 816411debe..7ef90cf2f6 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -1,6 +1,7 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import org.scalatest.prop.Checkers
 
 import scala.collection.mutable.ArrayBuffer
@@ -22,11 +23,20 @@ object FailureSuiteState {
   }
 }
 
-class FailureSuite extends FunSuite {
+class FailureSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+    
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   // Run a 3-task map job in which task 1 deterministically fails once, and check
   // whether the job completes successfully and we ran 4 tasks in total.
   test("failure in a single-stage job") {
-    val sc = new SparkContext("local[1,1]", "test")
+    sc = new SparkContext("local[1,1]", "test")
     val results = sc.makeRDD(1 to 3, 3).map { x =>
       FailureSuiteState.synchronized {
         FailureSuiteState.tasksRun += 1
@@ -41,13 +51,12 @@ class FailureSuite extends FunSuite {
       assert(FailureSuiteState.tasksRun === 4)
     }
     assert(results.toList === List(1,4,9))
-    sc.stop()
     FailureSuiteState.clear()
   }
 
   // Run a map-reduce job in which a reduce task deterministically fails once.
   test("failure in a two-stage job") {
-    val sc = new SparkContext("local[1,1]", "test")
+    sc = new SparkContext("local[1,1]", "test")
     val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
       case (k, v) => 
         FailureSuiteState.synchronized {
@@ -63,12 +72,11 @@ class FailureSuite extends FunSuite {
       assert(FailureSuiteState.tasksRun === 4)
     }
     assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
-    sc.stop()
     FailureSuiteState.clear()
   }
 
   test("failure because task results are not serializable") {
-    val sc = new SparkContext("local[1,1]", "test")
+    sc = new SparkContext("local[1,1]", "test")
     val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
 
     val thrown = intercept[spark.SparkException] {
@@ -77,7 +85,6 @@ class FailureSuite extends FunSuite {
     assert(thrown.getClass === classOf[spark.SparkException])
     assert(thrown.getMessage.contains("NotSerializableException"))
 
-    sc.stop()
     FailureSuiteState.clear()
   }
 
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index b12014e6be..3a77ed0f13 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -6,13 +6,23 @@ import scala.io.Source
 
 import com.google.common.io.Files
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import org.apache.hadoop.io._
 
 import SparkContext._
 
-class FileSuite extends FunSuite {
+class FileSuite extends FunSuite with BeforeAndAfter{
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("text files") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 4)
@@ -23,11 +33,10 @@ class FileSuite extends FunSuite {
     assert(content === "1\n2\n3\n4\n")
     // Also try reading it in as a text file RDD
     assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
-    sc.stop()
   }
 
   test("SequenceFiles") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@@ -35,11 +44,10 @@ class FileSuite extends FunSuite {
     // Try reading the output back as a SequenceFile
     val output = sc.sequenceFile[IntWritable, Text](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("SequenceFile with writable key") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) 
@@ -47,11 +55,10 @@ class FileSuite extends FunSuite {
     // Try reading the output back as a SequenceFile
     val output = sc.sequenceFile[IntWritable, Text](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("SequenceFile with writable value") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
@@ -59,11 +66,10 @@ class FileSuite extends FunSuite {
     // Try reading the output back as a SequenceFile
     val output = sc.sequenceFile[IntWritable, Text](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("SequenceFile with writable key and value") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -71,11 +77,10 @@ class FileSuite extends FunSuite {
     // Try reading the output back as a SequenceFile
     val output = sc.sequenceFile[IntWritable, Text](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("implicit conversions in reading SequenceFiles") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@@ -89,11 +94,10 @@ class FileSuite extends FunSuite {
     assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
     val output3 = sc.sequenceFile[IntWritable, String](outputDir)
     assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("object files of ints") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 4)
@@ -101,11 +105,10 @@ class FileSuite extends FunSuite {
     // Try reading the output back as an object file
     val output = sc.objectFile[Int](outputDir)
     assert(output.collect().toList === List(1, 2, 3, 4))
-    sc.stop()
   }
 
   test("object files of complex types") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
@@ -113,12 +116,11 @@ class FileSuite extends FunSuite {
     // Try reading the output back as an object file
     val output = sc.objectFile[(Int, String)](outputDir)
     assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
-    sc.stop()
   }
 
   test("write SequenceFile using new Hadoop API") {
     import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -126,12 +128,11 @@ class FileSuite extends FunSuite {
         outputDir)
     val output = sc.sequenceFile[IntWritable, Text](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 
   test("read SequenceFile using new Hadoop API") {
     import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -139,6 +140,5 @@ class FileSuite extends FunSuite {
     val output =
         sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
-    sc.stop()
   }
 }
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 06d446ea24..e889769b9a 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -8,7 +8,8 @@ import com.esotericsoftware.kryo._
 
 import SparkContext._
 
-class KryoSerializerSuite extends FunSuite {
+class KryoSerializerSuite extends FunSuite{
+  
   test("basic types") {
     val ser = (new KryoSerializer).newInstance()
     def check[T](t: T) {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 7f7f9493dc..dfe6a295c8 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -1,12 +1,23 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 
 import scala.collection.mutable.ArrayBuffer
 
 import SparkContext._
 
-class PartitioningSuite extends FunSuite {
+class PartitioningSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
+  
   test("HashPartitioner equality") {
     val p2 = new HashPartitioner(2)
     val p4 = new HashPartitioner(4)
@@ -20,7 +31,7 @@ class PartitioningSuite extends FunSuite {
   }
 
   test("RangePartitioner equality") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
 
     // Make an RDD where all the elements are the same so that the partition range bounds
     // are deterministically all the same.
@@ -46,12 +57,10 @@ class PartitioningSuite extends FunSuite {
     assert(p4 != descendingP4)
     assert(descendingP2 != p2)
     assert(descendingP4 != p4)
-
-    sc.stop()
   }
 
   test("HashPartitioner not equal to RangePartitioner") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd = sc.parallelize(1 to 10).map(x => (x, x))
     val rangeP2 = new RangePartitioner(2, rdd)
     val hashP2 = new HashPartitioner(2)
@@ -59,11 +68,10 @@ class PartitioningSuite extends FunSuite {
     assert(hashP2 === hashP2)
     assert(hashP2 != rangeP2)
     assert(rangeP2 != hashP2)
-    sc.stop()
   }
 
   test("partitioner preservation") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
 
     val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
 
@@ -95,7 +103,5 @@ class PartitioningSuite extends FunSuite {
     assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
     assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
     assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
-
-    sc.stop()
   }
 }
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index d5dc2efd91..c0cf034c72 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -1,12 +1,21 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import SparkContext._
 
-class PipedRDDSuite extends FunSuite {
-
+class PipedRDDSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("basic pipe") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
 
     val piped = nums.pipe(Seq("cat"))
@@ -18,18 +27,16 @@ class PipedRDDSuite extends FunSuite {
     assert(c(1) === "2")
     assert(c(2) === "3")
     assert(c(3) === "4")
-    sc.stop()
   }
 
   test("pipe with env variable") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
     val c = piped.collect()
     assert(c.size === 2)
     assert(c(0) === "LALALA")
     assert(c(1) === "LALALA")
-    sc.stop()
   }
 
 }
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 7199b634b7..1d240b471f 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -2,11 +2,21 @@ package spark
 
 import scala.collection.mutable.HashMap
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import SparkContext._
 
-class RDDSuite extends FunSuite {
+class RDDSuite extends FunSuite with BeforeAndAfter{
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("basic operations") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(nums.collect().toList === List(1, 2, 3, 4))
     assert(nums.reduce(_ + _) === 10)
@@ -18,11 +28,10 @@ class RDDSuite extends FunSuite {
     assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
     val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
     assert(partitionSums.collect().toList === List(3, 7))
-    sc.stop()
   }
 
   test("aggregate") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
     type StringMap = HashMap[String, Int]
     val emptyMap = new StringMap {
@@ -40,6 +49,5 @@ class RDDSuite extends FunSuite {
     }
     val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
     assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
-    sc.stop()
   }
 }
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 00b24464a6..c45c5935de 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -1,6 +1,7 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import org.scalatest.prop.Checkers
 import org.scalacheck.Arbitrary._
 import org.scalacheck.Gen
@@ -12,9 +13,18 @@ import scala.collection.mutable.ArrayBuffer
 
 import SparkContext._
 
-class ShuffleSuite extends FunSuite {
+class ShuffleSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("groupByKey") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
     val groups = pairs.groupByKey().collect()
     assert(groups.size === 2)
@@ -22,11 +32,10 @@ class ShuffleSuite extends FunSuite {
     assert(valuesFor1.toList.sorted === List(1, 2, 3))
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
-    sc.stop()
   }
 
   test("groupByKey with duplicates") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
     val groups = pairs.groupByKey().collect()
     assert(groups.size === 2)
@@ -34,11 +43,10 @@ class ShuffleSuite extends FunSuite {
     assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
-    sc.stop()
   }
 
   test("groupByKey with negative key hash codes") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
     val groups = pairs.groupByKey().collect()
     assert(groups.size === 2)
@@ -46,11 +54,10 @@ class ShuffleSuite extends FunSuite {
     assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
-    sc.stop()
   }
   
   test("groupByKey with many output partitions") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
     val groups = pairs.groupByKey(10).collect()
     assert(groups.size === 2)
@@ -58,37 +65,33 @@ class ShuffleSuite extends FunSuite {
     assert(valuesFor1.toList.sorted === List(1, 2, 3))
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
-    sc.stop()
   }
 
   test("reduceByKey") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
     val sums = pairs.reduceByKey(_+_).collect()
     assert(sums.toSet === Set((1, 7), (2, 1)))
-    sc.stop()
   }
 
   test("reduceByKey with collectAsMap") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
     val sums = pairs.reduceByKey(_+_).collectAsMap()
     assert(sums.size === 2)
     assert(sums(1) === 7)
     assert(sums(2) === 1)
-    sc.stop()
   }
 
   test("reduceByKey with many output partitons") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
     val sums = pairs.reduceByKey(_+_, 10).collect()
     assert(sums.toSet === Set((1, 7), (2, 1)))
-    sc.stop()
   }
 
   test("join") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.join(rdd2).collect()
@@ -99,11 +102,10 @@ class ShuffleSuite extends FunSuite {
       (2, (1, 'y')),
       (2, (1, 'z'))
     ))
-    sc.stop()
   }
 
   test("join all-to-all") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
     val joined = rdd1.join(rdd2).collect()
@@ -116,11 +118,10 @@ class ShuffleSuite extends FunSuite {
       (1, (3, 'x')),
       (1, (3, 'y'))
     ))
-    sc.stop()
   }
 
   test("leftOuterJoin") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.leftOuterJoin(rdd2).collect()
@@ -132,11 +133,10 @@ class ShuffleSuite extends FunSuite {
       (2, (1, Some('z'))),
       (3, (1, None))
     ))
-    sc.stop()
   }
 
   test("rightOuterJoin") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.rightOuterJoin(rdd2).collect()
@@ -148,20 +148,18 @@ class ShuffleSuite extends FunSuite {
       (2, (Some(1), 'z')),
       (4, (None, 'w'))
     ))
-    sc.stop()
   }
 
   test("join with no matches") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
     val joined = rdd1.join(rdd2).collect()
     assert(joined.size === 0)
-    sc.stop()
   }
 
   test("join with many output partitions") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.join(rdd2, 10).collect()
@@ -172,11 +170,10 @@ class ShuffleSuite extends FunSuite {
       (2, (1, 'y')),
       (2, (1, 'z'))
     ))
-    sc.stop()
   }
 
   test("groupWith") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.groupWith(rdd2).collect()
@@ -187,17 +184,15 @@ class ShuffleSuite extends FunSuite {
       (3, (ArrayBuffer(1), ArrayBuffer())),
       (4, (ArrayBuffer(), ArrayBuffer('w')))
     ))
-    sc.stop()
   }
   
   test("zero-partition RDD") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val emptyDir = Files.createTempDir()
     val file = sc.textFile(emptyDir.getAbsolutePath)
     assert(file.splits.size == 0)
     assert(file.collect().toList === Nil)
     // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
-    sc.stop()
-  } 
+    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)    
+  }
 }
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index caff884966..ced3c66d38 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -1,50 +1,55 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 import SparkContext._
 
-class SortingSuite extends FunSuite {
+class SortingSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
   test("sortByKey") {
-      val sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
       val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
-      assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
-      sc.stop()
+      assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))      
   }
 
   test("sortLargeArray") {
-      val sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
       val rand = new scala.util.Random()
       val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
       val pairs = sc.parallelize(pairArr)
       assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
-      sc.stop()
   }
 
   test("sortDescending") {
-      val sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
       val rand = new scala.util.Random()
       val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
       val pairs = sc.parallelize(pairArr)
       assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
-      sc.stop()
   }
 
   test("morePartitionsThanElements") {
-      val sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
       val rand = new scala.util.Random()
       val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
       val pairs = sc.parallelize(pairArr, 30)
       assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
-      sc.stop()
   }
 
   test("emptyRDD") {
-      val sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
       val rand = new scala.util.Random()
       val pairArr = new Array[(Int, Int)](0)
       val pairs = sc.parallelize(pairArr)
       assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
-      sc.stop()
   }
 }
 
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index d38e72d8b8..d617e8c15f 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 
 import SparkContext._
 
@@ -21,9 +22,19 @@ object ThreadingSuiteState {
   }
 }
 
-class ThreadingSuite extends FunSuite {
+class ThreadingSuite extends FunSuite with BeforeAndAfter {
+  
+  var sc: SparkContext = _
+  
+  after{
+    if(sc != null){
+      sc.stop()
+    }
+  }
+  
+  
   test("accessing SparkContext form a different thread") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val nums = sc.parallelize(1 to 10, 2)
     val sem = new Semaphore(0)
     @volatile var answer1: Int = 0
@@ -38,11 +49,10 @@ class ThreadingSuite extends FunSuite {
     sem.acquire()
     assert(answer1 === 55)
     assert(answer2 === 1)
-    sc.stop()
   }
 
   test("accessing SparkContext form multiple threads") {
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val nums = sc.parallelize(1 to 10, 2)
     val sem = new Semaphore(0)
     @volatile var ok = true
@@ -67,11 +77,10 @@ class ThreadingSuite extends FunSuite {
     if (!ok) {
       fail("One or more threads got the wrong answer from an RDD operation")
     }
-    sc.stop()
   }
 
   test("accessing multi-threaded SparkContext form multiple threads") {
-    val sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test")
     val nums = sc.parallelize(1 to 10, 2)
     val sem = new Semaphore(0)
     @volatile var ok = true
@@ -96,13 +105,12 @@ class ThreadingSuite extends FunSuite {
     if (!ok) {
       fail("One or more threads got the wrong answer from an RDD operation")
     }
-    sc.stop()
   }
 
   test("parallel job execution") {
     // This test launches two jobs with two threads each on a 4-core local cluster. Each thread
     // waits until there are 4 threads running at once, to test that both jobs have been launched.
-    val sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test")
     val nums = sc.parallelize(1 to 2, 2)
     val sem = new Semaphore(0)
     ThreadingSuiteState.clear()
@@ -132,6 +140,5 @@ class ThreadingSuite extends FunSuite {
     if (ThreadingSuiteState.failed.get()) {
       fail("One or more threads didn't see runningThreads = 4")
     }
-    sc.stop()
   }
 }
-- 
GitLab