diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index ca385972fb2ebe3add71881f18c3edc9761077d8..28a7b21b92bb5f6c1189bc3081e6b1bb90cf53be 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -27,6 +27,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     }
   }
 
+  test("basic checkpointing") {
+    val parCollection = sc.makeRDD(1 to 4)
+    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
+    flatMappedRDD.checkpoint()
+    assert(flatMappedRDD.dependencies.head.rdd == parCollection)
+    val result = flatMappedRDD.collect()
+    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
+    assert(flatMappedRDD.collect() === result)
+  }
+
   test("RDDs with one-to-one dependencies") {
     testCheckpointing(_.map(x => x.toString))
     testCheckpointing(_.flatMap(x => 1 to x))
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..682d2745bffc43e8edad057f448e46372f56fb92
--- /dev/null
+++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
@@ -0,0 +1,287 @@
+package spark
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import com.google.common.io.Files
+
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
+
+class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
+  test("groupByKey") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
+    val groups = pairs.groupByKey().collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
+  }
+
+  test("groupByKey with duplicates") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val groups = pairs.groupByKey().collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
+  }
+
+  test("groupByKey with negative key hash codes") {
+    val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
+    val groups = pairs.groupByKey().collect()
+    assert(groups.size === 2)
+    val valuesForMinus1 = groups.find(_._1 == -1).get._2
+    assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
+  }
+
+  test("groupByKey with many output partitions") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
+    val groups = pairs.groupByKey(10).collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
+  }
+
+  test("reduceByKey") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val sums = pairs.reduceByKey(_+_).collect()
+    assert(sums.toSet === Set((1, 7), (2, 1)))
+  }
+
+  test("reduceByKey with collectAsMap") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val sums = pairs.reduceByKey(_+_).collectAsMap()
+    assert(sums.size === 2)
+    assert(sums(1) === 7)
+    assert(sums(2) === 1)
+  }
+
+  test("reduceByKey with many output partitons") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val sums = pairs.reduceByKey(_+_, 10).collect()
+    assert(sums.toSet === Set((1, 7), (2, 1)))
+  }
+
+  test("reduceByKey with partitioner") {
+    val p = new Partitioner() {
+      def numPartitions = 2
+      def getPartition(key: Any) = key.asInstanceOf[Int]
+    }
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
+    val sums = pairs.reduceByKey(_+_)
+    assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+    assert(sums.partitioner === Some(p))
+    // count the dependencies to make sure there is only 1 ShuffledRDD
+    val deps = new HashSet[RDD[_]]()
+    def visit(r: RDD[_]) {
+      for (dep <- r.dependencies) {
+        deps += dep.rdd
+        visit(dep.rdd)
+      }
+    }
+    visit(sums)
+    assert(deps.size === 2) // ShuffledRDD, ParallelCollection
+  }
+
+  test("join") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.join(rdd2).collect()
+    assert(joined.size === 4)
+    assert(joined.toSet === Set(
+      (1, (1, 'x')),
+      (1, (2, 'x')),
+      (2, (1, 'y')),
+      (2, (1, 'z'))
+    ))
+  }
+
+  test("join all-to-all") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
+    val joined = rdd1.join(rdd2).collect()
+    assert(joined.size === 6)
+    assert(joined.toSet === Set(
+      (1, (1, 'x')),
+      (1, (1, 'y')),
+      (1, (2, 'x')),
+      (1, (2, 'y')),
+      (1, (3, 'x')),
+      (1, (3, 'y'))
+    ))
+  }
+
+  test("leftOuterJoin") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.leftOuterJoin(rdd2).collect()
+    assert(joined.size === 5)
+    assert(joined.toSet === Set(
+      (1, (1, Some('x'))),
+      (1, (2, Some('x'))),
+      (2, (1, Some('y'))),
+      (2, (1, Some('z'))),
+      (3, (1, None))
+    ))
+  }
+
+  test("rightOuterJoin") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.rightOuterJoin(rdd2).collect()
+    assert(joined.size === 5)
+    assert(joined.toSet === Set(
+      (1, (Some(1), 'x')),
+      (1, (Some(2), 'x')),
+      (2, (Some(1), 'y')),
+      (2, (Some(1), 'z')),
+      (4, (None, 'w'))
+    ))
+  }
+
+  test("join with no matches") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
+    val joined = rdd1.join(rdd2).collect()
+    assert(joined.size === 0)
+  }
+
+  test("join with many output partitions") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.join(rdd2, 10).collect()
+    assert(joined.size === 4)
+    assert(joined.toSet === Set(
+      (1, (1, 'x')),
+      (1, (2, 'x')),
+      (2, (1, 'y')),
+      (2, (1, 'z'))
+    ))
+  }
+
+  test("groupWith") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.groupWith(rdd2).collect()
+    assert(joined.size === 4)
+    assert(joined.toSet === Set(
+      (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
+      (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
+      (3, (ArrayBuffer(1), ArrayBuffer())),
+      (4, (ArrayBuffer(), ArrayBuffer('w')))
+    ))
+  }
+
+  test("zero-partition RDD") {
+    val emptyDir = Files.createTempDir()
+    val file = sc.textFile(emptyDir.getAbsolutePath)
+    assert(file.partitions.size == 0)
+    assert(file.collect().toList === Nil)
+    // Test that a shuffle on the file works, because this used to be a bug
+    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+  }
+
+  test("keys and values") {
+    val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
+    assert(rdd.keys.collect().toList === List(1, 2))
+    assert(rdd.values.collect().toList === List("a", "b"))
+  }
+
+  test("default partitioner uses partition size") {
+    // specify 2000 partitions
+    val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+    // do a map, which loses the partitioner
+    val b = a.map(a => (a, (a * 2).toString))
+    // then a group by, and see we didn't revert to 2 partitions
+    val c = b.groupByKey()
+    assert(c.partitions.size === 2000)
+  }
+
+  test("default partitioner uses largest partitioner") {
+    val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
+    val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
+    val c = a.join(b)
+    assert(c.partitions.size === 2000)
+  }
+
+  test("subtract") {
+    val a = sc.parallelize(Array(1, 2, 3), 2)
+    val b = sc.parallelize(Array(2, 3, 4), 4)
+    val c = a.subtract(b)
+    assert(c.collect().toSet === Set(1))
+    assert(c.partitions.size === a.partitions.size)
+  }
+
+  test("subtract with narrow dependency") {
+    // use a deterministic partitioner
+    val p = new Partitioner() {
+      def numPartitions = 5
+      def getPartition(key: Any) = key.asInstanceOf[Int]
+    }
+    // partitionBy so we have a narrow dependency
+    val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+    // more partitions/no partitioner so a shuffle dependency
+    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+    val c = a.subtract(b)
+    assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+    // Ideally we could keep the original partitioner...
+    assert(c.partitioner === None)
+  }
+
+  test("subtractByKey") {
+    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
+    val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
+    val c = a.subtractByKey(b)
+    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+    assert(c.partitions.size === a.partitions.size)
+  }
+
+  test("subtractByKey with narrow dependency") {
+    // use a deterministic partitioner
+    val p = new Partitioner() {
+      def numPartitions = 5
+      def getPartition(key: Any) = key.asInstanceOf[Int]
+    }
+    // partitionBy so we have a narrow dependency
+    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+    // more partitions/no partitioner so a shuffle dependency
+    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+    val c = a.subtractByKey(b)
+    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+    assert(c.partitioner.get === p)
+  }
+
+  test("foldByKey") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val sums = pairs.foldByKey(0)(_+_).collect()
+    assert(sums.toSet === Set((1, 7), (2, 1)))
+  }
+
+  test("foldByKey with mutable result type") {
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+    val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
+    // Fold the values using in-place mutation
+    val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
+    assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
+    // Check that the mutable objects in the original RDD were not changed
+    assert(bufs.collect().toSet === Set(
+      (1, ArrayBuffer(1)),
+      (1, ArrayBuffer(2)),
+      (1, ArrayBuffer(3)),
+      (1, ArrayBuffer(1)),
+      (2, ArrayBuffer(1))))
+  }
+}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 16f93e71a3721029645afbc9dedc4681175a921c..99e433e3bd61c813ebc1126f58cba09129ecd262 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -6,8 +6,8 @@ import SparkContext._
 import spark.util.StatCounter
 import scala.math.abs
 
-class PartitioningSuite extends FunSuite with LocalSparkContext {
-  
+class PartitioningSuite extends FunSuite with SharedSparkContext {
+
   test("HashPartitioner equality") {
     val p2 = new HashPartitioner(2)
     val p4 = new HashPartitioner(4)
@@ -21,8 +21,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
   }
 
   test("RangePartitioner equality") {
-    sc = new SparkContext("local", "test")
-
     // Make an RDD where all the elements are the same so that the partition range bounds
     // are deterministically all the same.
     val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
@@ -50,7 +48,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
   }
 
   test("HashPartitioner not equal to RangePartitioner") {
-    sc = new SparkContext("local", "test")
     val rdd = sc.parallelize(1 to 10).map(x => (x, x))
     val rangeP2 = new RangePartitioner(2, rdd)
     val hashP2 = new HashPartitioner(2)
@@ -61,8 +58,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
   }
 
   test("partitioner preservation") {
-    sc = new SparkContext("local", "test")
-
     val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
 
     val grouped2 = rdd.groupByKey(2)
@@ -101,7 +96,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
   }
 
   test("partitioning Java arrays should fail") {
-    sc = new SparkContext("local", "test")
     val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
     val arrPairs: RDD[(Array[Int], Int)] =
       sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
@@ -120,21 +114,20 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
     assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
   }
-  
-  test("Zero-length partitions should be correctly handled") {
+
+  test("zero-length partitions should be correctly handled") {
     // Create RDD with some consecutive empty partitions (including the "first" one)
-    sc = new SparkContext("local", "test")
     val rdd: RDD[Double] = sc
         .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
         .filter(_ >= 0.0)
-    
+
     // Run the partitions, including the consecutive empty ones, through StatCounter
     val stats: StatCounter = rdd.stats();
     assert(abs(6.0 - stats.sum) < 0.01);
     assert(abs(6.0/2 - rdd.mean) < 0.01);
     assert(abs(1.0 - rdd.variance) < 0.01);
     assert(abs(1.0 - rdd.stdev) < 0.01);
-    
+
     // Add other tests here for classes that should be able to handle empty partitions correctly
   }
 }
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index ed075f93ec5507adb3bed3b745da055b09575fa9..1c9ca5081120ae9fd9e14e2b1b1da1ce9d8c5c19 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -3,10 +3,9 @@ package spark
 import org.scalatest.FunSuite
 import SparkContext._
 
-class PipedRDDSuite extends FunSuite with LocalSparkContext {
-  
+class PipedRDDSuite extends FunSuite with SharedSparkContext {
+
   test("basic pipe") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
 
     val piped = nums.pipe(Seq("cat"))
@@ -20,12 +19,11 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("advanced pipe") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val bl = sc.broadcast(List("0"))
 
-    val piped = nums.pipe(Seq("cat"), 
-      Map[String, String](), 
+    val piped = nums.pipe(Seq("cat"),
+      Map[String, String](),
       (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
       (i:Int, f: String=> Unit) => f(i + "_"))
 
@@ -43,8 +41,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
 
     val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
     val d = nums1.groupBy(str=>str.split("\t")(0)).
-      pipe(Seq("cat"), 
-           Map[String, String](), 
+      pipe(Seq("cat"),
+           Map[String, String](),
            (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
            (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
     assert(d.size === 8)
@@ -59,7 +57,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("pipe with env variable") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
     val c = piped.collect()
@@ -69,7 +66,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("pipe with non-zero exit status") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val piped = nums.pipe("cat nonexistent_file")
     intercept[SparkException] {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 67f3332d444d83b68bdee5c3e0159d9a6e3edf8d..d8db69b1c918c0988b1910ef812abf01a3c1ace5 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -7,10 +7,9 @@ import org.scalatest.time.{Span, Millis}
 import spark.SparkContext._
 import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
 
-class RDDSuite extends FunSuite with LocalSparkContext {
+class RDDSuite extends FunSuite with SharedSparkContext {
 
   test("basic operations") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(nums.collect().toList === List(1, 2, 3, 4))
     val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
@@ -46,7 +45,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("SparkContext.union") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
     assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
@@ -55,7 +53,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("aggregate") {
-    sc = new SparkContext("local", "test")
     val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
     type StringMap = HashMap[String, Int]
     val emptyMap = new StringMap {
@@ -75,57 +72,14 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
   }
 
-  test("basic checkpointing") {
-    import java.io.File
-    val checkpointDir = File.createTempFile("temp", "")
-    checkpointDir.delete()
-
-    sc = new SparkContext("local", "test")
-    sc.setCheckpointDir(checkpointDir.toString)
-    val parCollection = sc.makeRDD(1 to 4)
-    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
-    flatMappedRDD.checkpoint()
-    assert(flatMappedRDD.dependencies.head.rdd == parCollection)
-    val result = flatMappedRDD.collect()
-    Thread.sleep(1000)
-    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
-    assert(flatMappedRDD.collect() === result)
-
-    checkpointDir.deleteOnExit()
-  }
-
   test("basic caching") {
-    sc = new SparkContext("local", "test")
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
     assert(rdd.collect().toList === List(1, 2, 3, 4))
     assert(rdd.collect().toList === List(1, 2, 3, 4))
     assert(rdd.collect().toList === List(1, 2, 3, 4))
   }
 
-  test("unpersist RDD") {
-    sc = new SparkContext("local", "test")
-    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-    rdd.count
-    assert(sc.persistentRdds.isEmpty === false)
-    rdd.unpersist()
-    assert(sc.persistentRdds.isEmpty === true)
-
-    failAfter(Span(3000, Millis)) {
-      try {
-        while (! sc.getRDDStorageInfo.isEmpty) {
-          Thread.sleep(200)
-        }
-      } catch {
-        case _ => { Thread.sleep(10) }
-          // Do nothing. We might see exceptions because block manager
-          // is racing this thread to remove entries from the driver.
-      }
-    }
-    assert(sc.getRDDStorageInfo.isEmpty === true)
-  }
-
   test("caching with failures") {
-    sc = new SparkContext("local", "test")
     val onlySplit = new Partition { override def index: Int = 0 }
     var shouldFail = true
     val rdd = new RDD[Int](sc, Nil) {
@@ -148,7 +102,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("empty RDD") {
-    sc = new SparkContext("local", "test")
     val empty = new EmptyRDD[Int](sc)
     assert(empty.count === 0)
     assert(empty.collect().size === 0)
@@ -168,37 +121,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("cogrouped RDDs") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
-    val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
-
-    // Use cogroup function
-    val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
-    assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
-    assert(cogrouped(2) === (Seq("two"), Seq("two1")))
-    assert(cogrouped(3) === (Seq("three"), Seq()))
-
-    // Construct CoGroupedRDD directly, with map side combine enabled
-    val cogrouped1 = new CoGroupedRDD[Int](
-      Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
-      new HashPartitioner(3),
-      true).collectAsMap()
-    assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
-    assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
-    assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
-
-    // Construct CoGroupedRDD directly, with map side combine disabled
-    val cogrouped2 = new CoGroupedRDD[Int](
-      Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
-      new HashPartitioner(3),
-      false).collectAsMap()
-    assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
-    assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
-    assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
-  }
-
-  test("coalesced RDDs") {
-    sc = new SparkContext("local", "test")
     val data = sc.parallelize(1 to 10, 10)
 
     val coalesced1 = data.coalesce(2)
@@ -236,7 +158,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("zipped RDDs") {
-    sc = new SparkContext("local", "test")
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val zipped = nums.zip(nums.map(_ + 1.0))
     assert(zipped.glom().map(_.toList).collect().toList ===
@@ -248,7 +169,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("partition pruning") {
-    sc = new SparkContext("local", "test")
     val data = sc.parallelize(1 to 10, 10)
     // Note that split number starts from 0, so > 8 means only 10th partition left.
     val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
@@ -260,7 +180,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
 
   test("mapWith") {
     import java.util.Random
-    sc = new SparkContext("local", "test")
     val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
     val randoms = ones.mapWith(
       (index: Int) => new Random(index + 42))
@@ -279,7 +198,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
 
   test("flatMapWith") {
     import java.util.Random
-    sc = new SparkContext("local", "test")
     val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
     val randoms = ones.flatMapWith(
       (index: Int) => new Random(index + 42))
@@ -301,7 +219,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
 
   test("filterWith") {
     import java.util.Random
-    sc = new SparkContext("local", "test")
     val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
     val sample = ints.filterWith(
       (index: Int) => new Random(index + 42))
@@ -319,7 +236,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("top with predefined ordering") {
-    sc = new SparkContext("local", "test")
     val nums = Array.range(1, 100000)
     val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
     val topK = ints.top(5)
@@ -328,7 +244,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
   }
 
   test("top with custom ordering") {
-    sc = new SparkContext("local", "test")
     val words = Vector("a", "b", "c", "d")
     implicit val ord = implicitly[Ordering[String]].reverse
     val rdd = sc.makeRDD(words, 2)
diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1da79f9824d11e0dd238e48d4f7177296833a542
--- /dev/null
+++ b/core/src/test/scala/spark/SharedSparkContext.scala
@@ -0,0 +1,25 @@
+package spark
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterAll
+
+/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
+trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
+
+  @transient private var _sc: SparkContext = _
+
+  def sc: SparkContext = _sc
+
+  override def beforeAll() {
+    _sc = new SparkContext("local", "test")
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    if (_sc != null) {
+      LocalSparkContext.stop(_sc)
+      _sc = null
+    }
+    super.afterAll()
+  }
+}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 0c1ec29f96c94e8adee1076948cfc1d7e9706f0f..950218fa28077ccc41a6e526029bdb59f59db18f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -16,54 +16,9 @@ import spark.rdd.ShuffledRDD
 import spark.SparkContext._
 
 class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
-
-  test("groupByKey") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with duplicates") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with negative key hash codes") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesForMinus1 = groups.find(_._1 == -1).get._2
-    assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with many output partitions") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
-    val groups = pairs.groupByKey(10).collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
   test("groupByKey with compression") {
     try {
-      System.setProperty("spark.blockManager.compress", "true")
+      System.setProperty("spark.shuffle.compress", "true")
       sc = new SparkContext("local", "test")
       val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
       val groups = pairs.groupByKey(4).collect()
@@ -77,234 +32,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     }
   }
 
-  test("reduceByKey") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("reduceByKey with collectAsMap") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collectAsMap()
-    assert(sums.size === 2)
-    assert(sums(1) === 7)
-    assert(sums(2) === 1)
-  }
-
-  test("reduceByKey with many output partitons") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_, 10).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("reduceByKey with partitioner") {
-    sc = new SparkContext("local", "test")
-    val p = new Partitioner() {
-      def numPartitions = 2
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
-    val sums = pairs.reduceByKey(_+_)
-    assert(sums.collect().toSet === Set((1, 4), (0, 1)))
-    assert(sums.partitioner === Some(p))
-    // count the dependencies to make sure there is only 1 ShuffledRDD
-    val deps = new HashSet[RDD[_]]()
-    def visit(r: RDD[_]) {
-      for (dep <- r.dependencies) {
-        deps += dep.rdd
-        visit(dep.rdd)
-      }
-    }
-    visit(sums)
-    assert(deps.size === 2) // ShuffledRDD, ParallelCollection
-  }
-
-  test("join") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (2, 'x')),
-      (2, (1, 'y')),
-      (2, (1, 'z'))
-    ))
-  }
-
-  test("join all-to-all") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 6)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (1, 'y')),
-      (1, (2, 'x')),
-      (1, (2, 'y')),
-      (1, (3, 'x')),
-      (1, (3, 'y'))
-    ))
-  }
-
-  test("leftOuterJoin") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.leftOuterJoin(rdd2).collect()
-    assert(joined.size === 5)
-    assert(joined.toSet === Set(
-      (1, (1, Some('x'))),
-      (1, (2, Some('x'))),
-      (2, (1, Some('y'))),
-      (2, (1, Some('z'))),
-      (3, (1, None))
-    ))
-  }
-
-  test("rightOuterJoin") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.rightOuterJoin(rdd2).collect()
-    assert(joined.size === 5)
-    assert(joined.toSet === Set(
-      (1, (Some(1), 'x')),
-      (1, (Some(2), 'x')),
-      (2, (Some(1), 'y')),
-      (2, (Some(1), 'z')),
-      (4, (None, 'w'))
-    ))
-  }
-
-  test("join with no matches") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 0)
-  }
-
-  test("join with many output partitions") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.join(rdd2, 10).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (2, 'x')),
-      (2, (1, 'y')),
-      (2, (1, 'z'))
-    ))
-  }
-
-  test("groupWith") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.groupWith(rdd2).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
-      (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
-      (3, (ArrayBuffer(1), ArrayBuffer())),
-      (4, (ArrayBuffer(), ArrayBuffer('w')))
-    ))
-  }
-
-  test("zero-partition RDD") {
-    sc = new SparkContext("local", "test")
-    val emptyDir = Files.createTempDir()
-    val file = sc.textFile(emptyDir.getAbsolutePath)
-    assert(file.partitions.size == 0)
-    assert(file.collect().toList === Nil)
-    // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
-  }
-
-  test("keys and values") {
-    sc = new SparkContext("local", "test")
-    val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
-    assert(rdd.keys.collect().toList === List(1, 2))
-    assert(rdd.values.collect().toList === List("a", "b"))
-  }
-
-  test("default partitioner uses partition size") {
-    sc = new SparkContext("local", "test")
-    // specify 2000 partitions
-    val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
-    // do a map, which loses the partitioner
-    val b = a.map(a => (a, (a * 2).toString))
-    // then a group by, and see we didn't revert to 2 partitions
-    val c = b.groupByKey()
-    assert(c.partitions.size === 2000)
-  }
-
-  test("default partitioner uses largest partitioner") {
-    sc = new SparkContext("local", "test")
-    val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
-    val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
-    val c = a.join(b)
-    assert(c.partitions.size === 2000)
-  }
-
-  test("subtract") {
-    sc = new SparkContext("local", "test")
-    val a = sc.parallelize(Array(1, 2, 3), 2)
-    val b = sc.parallelize(Array(2, 3, 4), 4)
-    val c = a.subtract(b)
-    assert(c.collect().toSet === Set(1))
-    assert(c.partitions.size === a.partitions.size)
-  }
-
-  test("subtract with narrow dependency") {
-    sc = new SparkContext("local", "test")
-    // use a deterministic partitioner
-    val p = new Partitioner() {
-      def numPartitions = 5
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    // partitionBy so we have a narrow dependency
-    val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
-    // more partitions/no partitioner so a shuffle dependency
-    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
-    val c = a.subtract(b)
-    assert(c.collect().toSet === Set((1, "a"), (3, "c")))
-    // Ideally we could keep the original partitioner...
-    assert(c.partitioner === None)
-  }
-
-  test("subtractByKey") {
-    sc = new SparkContext("local", "test")
-    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
-    val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
-    val c = a.subtractByKey(b)
-    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
-    assert(c.partitions.size === a.partitions.size)
-  }
-
-  test("subtractByKey with narrow dependency") {
-    sc = new SparkContext("local", "test")
-    // use a deterministic partitioner
-    val p = new Partitioner() {
-      def numPartitions = 5
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    // partitionBy so we have a narrow dependency
-    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
-    // more partitions/no partitioner so a shuffle dependency
-    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
-    val c = a.subtractByKey(b)
-    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
-    assert(c.partitioner.get === p)
-  }
-
   test("shuffle non-zero block size") {
     sc = new SparkContext("local-cluster[2,1,512]", "test")
     val NUM_BLOCKS = 3
@@ -391,29 +118,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     // We should have at most 4 non-zero sized partitions
     assert(nonEmptyBlocks.size <= 4)
   }
-
-  test("foldByKey") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.foldByKey(0)(_+_).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("foldByKey with mutable result type") {
-    sc = new SparkContext("local", "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
-    // Fold the values using in-place mutation
-    val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
-    assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
-    // Check that the mutable objects in the original RDD were not changed
-    assert(bufs.collect().toSet === Set(
-      (1, ArrayBuffer(1)),
-      (1, ArrayBuffer(2)),
-      (1, ArrayBuffer(3)),
-      (1, ArrayBuffer(1)),
-      (2, ArrayBuffer(1))))
-  }
 }
 
 object ShuffleSuite {
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index 495f957e53f251a9597782530f120aa4e055b08f..f7bf207c68c701730a218149e03a27f3cda5eec7 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -5,16 +5,14 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.matchers.ShouldMatchers
 import SparkContext._
 
-class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging {
-  
+class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
+
   test("sortByKey") {
-    sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
-    assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))      
+    assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
   }
 
   test("large array") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 2)
@@ -24,7 +22,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
   }
 
   test("large array with one split") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 2)
@@ -32,9 +29,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
     assert(sorted.partitions.size === 1)
     assert(sorted.collect() === pairArr.sortBy(_._1))
   }
-  
+
   test("large array with many partitions") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 2)
@@ -42,9 +38,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
     assert(sorted.partitions.size === 20)
     assert(sorted.collect() === pairArr.sortBy(_._1))
   }
-  
+
   test("sort descending") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 2)
@@ -52,15 +47,13 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
   }
 
   test("sort descending with one split") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 1)
     assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
   }
-  
+
   test("sort descending with many partitions") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 2)
@@ -68,7 +61,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
   }
 
   test("more partitions than elements") {
-    sc = new SparkContext("local", "test")
     val rand = new scala.util.Random()
     val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
     val pairs = sc.parallelize(pairArr, 30)
@@ -76,14 +68,12 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
   }
 
   test("empty RDD") {
-    sc = new SparkContext("local", "test")
     val pairArr = new Array[(Int, Int)](0)
     val pairs = sc.parallelize(pairArr, 2)
     assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
   }
 
   test("partition balancing") {
-    sc = new SparkContext("local", "test")
     val pairArr = (1 to 1000).map(x => (x, x)).toArray
     val sorted = sc.parallelize(pairArr, 4).sortByKey()
     assert(sorted.collect() === pairArr.sortBy(_._1))
@@ -99,7 +89,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
   }
 
   test("partition balancing for descending sort") {
-    sc = new SparkContext("local", "test")
     val pairArr = (1 to 1000).map(x => (x, x)).toArray
     val sorted = sc.parallelize(pairArr, 4).sortByKey(false)
     assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..94776e75720b83fdfb9ef8667bde9ef31959b63c
--- /dev/null
+++ b/core/src/test/scala/spark/UnpersistSuite.scala
@@ -0,0 +1,30 @@
+package spark
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.time.{Span, Millis}
+import spark.SparkContext._
+
+class UnpersistSuite extends FunSuite with LocalSparkContext {
+  test("unpersist RDD") {
+    sc = new SparkContext("local", "test")
+    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+    rdd.count
+    assert(sc.persistentRdds.isEmpty === false)
+    rdd.unpersist()
+    assert(sc.persistentRdds.isEmpty === true)
+
+    failAfter(Span(3000, Millis)) {
+      try {
+        while (! sc.getRDDStorageInfo.isEmpty) {
+          Thread.sleep(200)
+        }
+      } catch {
+        case _ => { Thread.sleep(10) }
+          // Do nothing. We might see exceptions because block manager
+          // is racing this thread to remove entries from the driver.
+      }
+    }
+    assert(sc.getRDDStorageInfo.isEmpty === true)
+  }
+}
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 5f60aa75d7f0334a4d99662e04abc9d785c47b4d..96cb295f45b0ce7bb07240ec1bba726db026e725 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -17,9 +17,8 @@ object ZippedPartitionsSuite {
   }
 }
 
-class ZippedPartitionsSuite extends FunSuite with LocalSparkContext {
+class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
   test("print sizes") {
-    sc = new SparkContext("local", "test")
     val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
     val data3 = sc.makeRDD(Array(1.0, 2.0), 2)