diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6cc0b1260b8890f5ddb033fdba9b2283f8ebd14b..6abb5c4792cc24954223c109b08671168b67c8c4 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -20,6 +20,7 @@ import spark.partial.BoundedDouble
 import spark.partial.CountEvaluator
 import spark.partial.GroupedCountEvaluator
 import spark.partial.PartialResult
+import spark.rdd.CoalescedRDD
 import spark.rdd.CartesianRDD
 import spark.rdd.FilteredRDD
 import spark.rdd.FlatMappedRDD
@@ -231,6 +232,11 @@ abstract class RDD[T: ClassManifest](
 
   def distinct(): RDD[T] = distinct(splits.size)
 
+  /**
+   * Return a new RDD that is reduced into `numSplits` partitions.
+   */
+  def coalesce(numSplits: Int): RDD[T] = new CoalescedRDD(this, numSplits)
+
   /**
    * Return a sampled subset of this RDD.
    */
@@ -649,7 +655,6 @@ abstract class RDD[T: ClassManifest](
    */
   private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
     clearDependencies()
-    dependencies_ = null
     splits_ = null
     deps = null    // Forget the constructor argument for dependencies too
   }
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 843e1bd18bdbf766819108b9abcc5eec09afa3f5..2810631b4158a8062ab9190bbbeffa7a9409803a 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -52,6 +52,11 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
   def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
     fromRDD(srdd.filter(x => f(x).booleanValue()))
 
+  /**
+   * Return a new RDD that is reduced into `numSplits` partitions.
+   */
+  def coalesce(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numSplits))
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 8ce32e0e2fd21b57ef28f9c8c044ef2727be64e8..8a123bdb47297a1dcc0484e6cc4a9a5639e27cc2 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -62,6 +62,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
   def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
 
+  /**
+   * Return a new RDD that is reduced into `numSplits` partitions.
+   */
+  def coalesce(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numSplits))
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index ac31350ec3374e6a6396087b2c37681ecfe79f81..23e7ae2726f64c75ac0420cfed565e8446b17db4 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -38,6 +38,11 @@ JavaRDDLike[T, JavaRDD[T]] {
   def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
     wrapRDD(rdd.filter((x => f(x).booleanValue())))
 
+  /**
+   * Return a new RDD that is reduced into `numSplits` partitions.
+   */
+  def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits)
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index c7f226044d1e9a9a11db7487a64c31c51402af46..b6ec664d7e81bb581f1be553b3b1dfaafca61865 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
   logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
   
-  val thisInstance = this
   val selectorThread = new Thread("connection-manager-thread") {
-    override def run() {
-      thisInstance.run()
-    }
+    override def run() = ConnectionManager.this.run()
   }
   selectorThread.setDaemon(true)
   selectorThread.start()
 
-  def run() {
+  private def run() {
     try {
       while(!selectorThread.isInterrupted) {
-        for( (connectionManagerId, sendingConnection) <- connectionRequests) {
+        for ((connectionManagerId, sendingConnection) <- connectionRequests) {
           sendingConnection.connect() 
           addConnection(sendingConnection)
           connectionRequests -= connectionManagerId
         }
         sendMessageRequests.synchronized {
-          while(!sendMessageRequests.isEmpty) {
+          while (!sendMessageRequests.isEmpty) {
             val (message, connection) = sendMessageRequests.dequeue
             connection.send(message)
           }
         }
 
-        while(!keyInterestChangeRequests.isEmpty) {
+        while (!keyInterestChangeRequests.isEmpty) {
           val (key, ops) = keyInterestChangeRequests.dequeue
           val connection = connectionsByKey(key)
           val lastOps = key.interestOps()
@@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
           if (key.isValid) {
             if (key.isAcceptable) {
               acceptConnection(key)
-            } else 
-            if (key.isConnectable) {
+            } else if (key.isConnectable) {
               connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect()
-            } else 
-            if (key.isReadable) {
+            } else if (key.isReadable) {
               connectionsByKey(key).read()
-            } else 
-            if (key.isWritable) {
+            } else if (key.isWritable) {
               connectionsByKey(key).write()
             }
           }
@@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     }
   }
   
-  def acceptConnection(key: SelectionKey) {
+  private def acceptConnection(key: SelectionKey) {
     val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
     val newChannel = serverChannel.accept()
     val newConnection = new ReceivingConnection(newChannel, selector)
@@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]")
   }
 
-  def addConnection(connection: Connection) {
+  private def addConnection(connection: Connection) {
     connectionsByKey += ((connection.key, connection))
     if (connection.isInstanceOf[SendingConnection]) {
       val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     connection.onClose(removeConnection)
   }
 
-  def removeConnection(connection: Connection) {
+  private def removeConnection(connection: Connection) {
     connectionsByKey -= connection.key
     if (connection.isInstanceOf[SendingConnection]) {
       val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
     }
   }
 
-  def handleConnectionError(connection: Connection, e: Exception) {
+  private def handleConnectionError(connection: Connection, e: Exception) {
     logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId)
     removeConnection(connection)
   }
 
-  def changeConnectionKeyInterest(connection: Connection, ops: Int) {
+  private def changeConnectionKeyInterest(connection: Connection, ops: Int) {
     keyInterestChangeRequests += ((connection.key, ops))  
   }
 
-  def receiveMessage(connection: Connection, message: Message) {
+  private def receiveMessage(connection: Connection, message: Message) {
     val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
     logDebug("Received [" + message + "] from [" + connectionManagerId + "]") 
     val runnable = new Runnable() {
@@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
 private[spark] object ConnectionManager {
 
   def main(args: Array[String]) {
-  
     val manager = new ConnectionManager(9999)
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 
       println("Received [" + msg + "] from [" + id + "]")
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 2c022f88e0defb7445463cb78e4ee7c57042d68d..17989c5ce5d18ea10cc0cdf812673f6a62c3df87 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -11,10 +11,6 @@ private[spark]
 class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
   extends RDD[T](sc, Nil) {
 
-  @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
-    new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
-  }).toArray
-
   @transient lazy val locations_  = {
     val blockManager = SparkEnv.get.blockManager
     /*val locations = blockIds.map(id => blockManager.getLocations(id))*/
@@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
     HashMap(blockIds.zip(locations):_*)
   }
 
-  override def getSplits = splits_
+  override def getSplits: Array[Split] = (0 until blockIds.size).map(i => {
+    new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
+  }).toArray
+
 
   override def compute(split: Split, context: TaskContext): Iterator[T] = {
     val blockManager = SparkEnv.get.blockManager
@@ -34,11 +33,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
     }
   }
 
-  override def getPreferredLocations(split: Split) =
+  override def getPreferredLocations(split: Split): Seq[String] =
     locations_(split.asInstanceOf[BlockRDDSplit].blockId)
 
-  override def clearDependencies() {
-    splits_ = null
-  }
 }
 
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 0f9ca0653198f35c25122c0dedfe22ef93849762..41cbbd0093dea6e931a3199776478ceaad6fecd8 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -45,7 +45,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
     array
   }
 
-  override def getPreferredLocations(split: Split) = {
+  override def getPreferredLocations(split: Split): Seq[String] = {
     val currSplit = split.asInstanceOf[CartesianSplit]
     rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
   }
@@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
   )
 
   override def clearDependencies() {
+    super.clearDependencies()
     rdd1 = null
     rdd2 = null
   }
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 96b593ba7ca6d282c7158d9049b054afc8a27ea5..3558d4673f216070f5a8085f1deaf1c92f070f99 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
 
   @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
 
-  @transient val splits_ : Array[Split] = {
+  override def getSplits: Array[Split] = {
     val dirContents = fs.listStatus(new Path(checkpointPath))
     val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
     val numSplits = splitFiles.size
@@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
   checkpointData = Some(new RDDCheckpointData[T](this))
   checkpointData.get.cpFile = Some(checkpointPath)
 
-  override def getSplits = splits_
-
   override def getPreferredLocations(split: Split): Seq[String] = {
     val status = fs.getFileStatus(new Path(checkpointPath))
     val locations = fs.getFileBlockLocations(status, 0, status.getLen)
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 4893fe8d784bf14cba83d55298c5bd533083bbda..0a1e2cbee0964d24349b48d1ec2a3a6df0dca423 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -43,42 +43,38 @@ private[spark] class CoGroupAggregator
 class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
 
-  val aggr = new CoGroupAggregator
+  private val aggr = new CoGroupAggregator
 
-  @transient var deps_ = {
-    val deps = new ArrayBuffer[Dependency[_]]
-    for ((rdd, index) <- rdds.zipWithIndex) {
+  override def getDependencies: Seq[Dependency[_]] = {
+    rdds.map { rdd =>
       if (rdd.partitioner == Some(part)) {
         logInfo("Adding one-to-one dependency with " + rdd)
-        deps += new OneToOneDependency(rdd)
+        new OneToOneDependency(rdd)
       } else {
         logInfo("Adding shuffle dependency with " + rdd)
         val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
-        deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+        new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
       }
     }
-    deps.toList
   }
 
-  override def getDependencies = deps_
-
-  @transient var splits_ : Array[Split] = {
+  override def getSplits: Array[Split] = {
     val array = new Array[Split](part.numPartitions)
     for (i <- 0 until array.size) {
-      array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
+      // Each CoGroupSplit will have a dependency per contributing RDD
+      array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) =>
+        // Assume each RDD contributed a single dependency, and get it
         dependencies(j) match {
           case s: ShuffleDependency[_, _] =>
-            new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
+            new ShuffleCoGroupSplitDep(s.shuffleId)
           case _ =>
-            new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep
+            new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
         }
       }.toList)
     }
     array
   }
 
-  override def getSplits = splits_
-  
   override val partitioner = Some(part)
 
   override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
@@ -97,7 +93,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
       }
     }
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
-      case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
+      case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
         for ((k, v) <- rdd.iterator(itsSplit, context)) {
           getSeq(k.asInstanceOf[K])(depNum) += v
@@ -115,8 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
   }
 
   override def clearDependencies() {
-    deps_ = null
-    splits_ = null
+    super.clearDependencies()
     rdds = null
   }
 }
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 4c57434b65f9a2f8fcafdcfa8225d9297cefbe16..fcd26da43abe5a186c27a938755e76276deba356 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -50,14 +50,15 @@ class CoalescedRDD[T: ClassManifest](
     }
   }
 
-  override def getDependencies: Seq[Dependency[_]] = List(
-    new NarrowDependency(prev) {
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(new NarrowDependency(prev) {
       def getParents(id: Int): Seq[Int] =
         splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
-    }
-  )
+    })
+  }
 
   override def clearDependencies() {
+    super.clearDependencies()
     prev = null
   }
 }
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
index 6dbe235bd9f15b304024354867b20561cb71f74e..93e398ea2b34a604d339748abe468455f3e23072 100644
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -7,7 +7,7 @@ private[spark] class FilteredRDD[T: ClassManifest](
     f: T => Boolean)
   extends RDD[T](prev) {
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override val partitioner = prev.partitioner    // Since filter cannot change a partition's keys
 
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
index 1b604c66e2fa52c849c38b1f16738b0949a7b405..8c2a610593306b88ba023d92123aabb0d2d8d590 100644
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -9,7 +9,7 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
     f: T => TraversableOnce[U])
   extends RDD[U](prev) {
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override def compute(split: Split, context: TaskContext) =
     firstParent[T].iterator(split, context).flatMap(f)
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
index 051bffed192bc39a9ac3084ffbec49acdb524340..70b9b4e34ed49395ec0b2371f8a9ac3027e30676 100644
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -5,7 +5,7 @@ import spark.{RDD, Split, TaskContext}
 private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
   extends RDD[Array[T]](prev) {
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override def compute(split: Split, context: TaskContext) =
     Array(firstParent[T].iterator(split, context).toArray).iterator
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index f547f53812661da253353384e43dbe2702a3cb68..854993737bc472b817c2c0952836b0af5c451015 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -45,10 +45,9 @@ class HadoopRDD[K, V](
   extends RDD[(K, V)](sc, Nil) {
 
   // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
 
-  @transient
-  val splits_ : Array[Split] = {
+  override def getSplits: Array[Split] = {
     val inputFormat = createInputFormat(conf)
     val inputSplits = inputFormat.getSplits(conf, minSplits)
     val array = new Array[Split](inputSplits.size)
@@ -63,8 +62,6 @@ class HadoopRDD[K, V](
       .asInstanceOf[InputFormat[K, V]]
   }
 
-  override def getSplits = splits_
-
   override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopSplit]
     var reader: RecordReader[K, V] = null
@@ -109,7 +106,7 @@ class HadoopRDD[K, V](
     }
   }
 
-  override def getPreferredLocations(split: Split) = {
+  override def getPreferredLocations(split: Split): Seq[String] = {
     // TODO: Filtering out "localhost" in case of file:// URLs
     val hadoopSplit = split.asInstanceOf[HadoopSplit]
     hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index 073f7d7d2aad251c4240bd665b9fc02e90eec8a8..7b0b4525c7e626b803dd84c88a5fce27927a32f7 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -13,7 +13,7 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
   override val partitioner =
     if (preservesPartitioning) firstParent[T].partitioner else None
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override def compute(split: Split, context: TaskContext) =
     f(firstParent[T].iterator(split, context))
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
index 2ddc3d01b647a573d85aa0c3622341fdd3ed1adb..c6dc1080a9089d48d104f1a8f86d97861861ee16 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -15,7 +15,7 @@ class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
     preservesPartitioning: Boolean
   ) extends RDD[U](prev) {
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override val partitioner = if (preservesPartitioning) prev.partitioner else None
 
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
index 5466c9c657fcb03b20f578ce4456aa4c5cc0c1ed..6074f411e3b48a3df92ab7edf0571d6f8bb3c757 100644
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -6,7 +6,7 @@ private[spark]
 class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
   extends RDD[U](prev) {
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override def compute(split: Split, context: TaskContext) =
     firstParent[T].iterator(split, context).map(f)
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index c3b155fcbddd6545b2aa45fa45c9bd610a56d9fa..345ae79d74d0dd8461b2dfd8e42e4b9441df4207 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -29,7 +29,7 @@ class NewHadoopRDD[K, V](
   with HadoopMapReduceUtil {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
-  val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
   // private val serializableConf = new SerializableWritable(conf)
 
   private val jobtrackerId: String = {
@@ -39,7 +39,7 @@ class NewHadoopRDD[K, V](
 
   @transient private val jobId = new JobID(jobtrackerId, id)
 
-  @transient private val splits_ : Array[Split] = {
+  override def getSplits: Array[Split] = {
     val inputFormat = inputFormatClass.newInstance
     val jobContext = newJobContext(conf, jobId)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -50,8 +50,6 @@ class NewHadoopRDD[K, V](
     result
   }
 
-  override def getSplits = splits_
-
   override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
     val split = theSplit.asInstanceOf[NewHadoopSplit]
     val conf = confBroadcast.value.value
@@ -85,7 +83,7 @@ class NewHadoopRDD[K, V](
     }
   }
 
-  override def getPreferredLocations(split: Split) = {
+  override def getPreferredLocations(split: Split): Seq[String] = {
     val theSplit = split.asInstanceOf[NewHadoopSplit]
     theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
   }
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index a50ce751718c4b640aaf0b3854c03c8e54d9d9e5..d1553181c166df10e3f45ed7230ad7d971f49455 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -37,6 +37,6 @@ class PartitionPruningRDD[T: ClassManifest](
   override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(
     split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context)
 
-  override protected def getSplits =
+  override protected def getSplits: Array[Split] =
     getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
 }
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index 6631f83510cb6851a9a6002792415d6ca556f1c8..56032a8659a05ab3fa0ca5eda28fafcd79d3074e 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -27,7 +27,7 @@ class PipedRDD[T: ClassManifest](
   // using a standard StringTokenizer (i.e. by spaces)
   def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
 
-  override def getSplits = firstParent[T].splits
+  override def getSplits: Array[Split] = firstParent[T].splits
 
   override def compute(split: Split, context: TaskContext): Iterator[String] = {
     val pb = new ProcessBuilder(command)
diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index e24ad23b2142315a67d6f79c53fc9b0282c613a7..f2a144e2e025f5e3b1c279bdfa6da18daecc663e 100644
--- a/core/src/main/scala/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -19,17 +19,15 @@ class SampledRDD[T: ClassManifest](
     seed: Int)
   extends RDD[T](prev) {
 
-  @transient var splits_ : Array[Split] = {
+  override def getSplits: Array[Split] = {
     val rg = new Random(seed)
     firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt))
   }
 
-  override def getSplits = splits_
-
-  override def getPreferredLocations(split: Split) =
+  override def getPreferredLocations(split: Split): Seq[String] =
     firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
 
-  override def compute(splitIn: Split, context: TaskContext) = {
+  override def compute(splitIn: Split, context: TaskContext): Iterator[T] = {
     val split = splitIn.asInstanceOf[SampledRDDSplit]
     if (withReplacement) {
       // For large datasets, the expected number of occurrences of each element in a sample with
@@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest](
       firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
     }
   }
-
-  override def clearDependencies() {
-    splits_ = null
-  }
 }
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index d3964786736156e3378fbef1d50f306e99ba0607..bf69b5150bcc4ac0e8ed452254b84a735f2fd63d 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -22,7 +22,9 @@ class ShuffledRDD[K, V](
 
   override val partitioner = Some(part)
 
-  override def getSplits = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
+  override def getSplits: Array[Split] = {
+    Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) 
+  }
 
   override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = {
     val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 26a2d511f2670621de66d5c5f9f271ff130ce031..ebc006822804f5d4204cbb624c9500bcfda1e8f6 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest](
 
   override def getPreferredLocations(s: Split): Seq[String] =
     s.asInstanceOf[UnionSplit[T]].preferredLocations()
-
-  override def clearDependencies() {
-    rdds = null
-  }
 }
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e5df6d8c7239b83a328f1b898e8c395c9c3e459c..1ce70268bb491bcd7f2eaf61b4bf778109ee000d 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -29,8 +29,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
     sc: SparkContext,
     var rdd1: RDD[T],
     var rdd2: RDD[U])
-  extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2)))
-  with Serializable {
+  extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) {
 
   override def getSplits: Array[Split] = {
     if (rdd1.splits.size != rdd2.splits.size) {
@@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
   }
 
   override def clearDependencies() {
+    super.clearDependencies()
     rdd1 = null
     rdd2 = null
   }
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0b74607fb85a6a5d0456b58744eba49bc1f98585..0d08fd239632b286b87848710f089684a9da5493 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("CoalescedRDD") {
-    testCheckpointing(new CoalescedRDD(_, 2))
+    testCheckpointing(_.coalesce(2))
 
     // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
     // Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
     // so only the RDD will reduce in serialized size, not the splits.
-    testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
+    testParentCheckpointing(_.coalesce(2), true, false)
 
     // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
     // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index fe7deb10d63b001ca5a3db3e9398dcb38c233126..ffa866de7532fc4bae950edfda3e34d40590c7d9 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     sc = new SparkContext("local", "test")
     val data = sc.parallelize(1 to 10, 10)
 
-    val coalesced1 = new CoalescedRDD(data, 2)
+    val coalesced1 = data.coalesce(2)
     assert(coalesced1.collect().toList === (1 to 10).toList)
     assert(coalesced1.glom().collect().map(_.toList).toList ===
       List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
@@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
       List(5, 6, 7, 8, 9))
 
-    val coalesced2 = new CoalescedRDD(data, 3)
+    val coalesced2 = data.coalesce(3)
     assert(coalesced2.collect().toList === (1 to 10).toList)
     assert(coalesced2.glom().collect().map(_.toList).toList ===
       List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
 
-    val coalesced3 = new CoalescedRDD(data, 10)
+    val coalesced3 = data.coalesce(10)
     assert(coalesced3.collect().toList === (1 to 10).toList)
     assert(coalesced3.glom().collect().map(_.toList).toList ===
       (1 to 10).map(x => List(x)).toList)
 
     // If we try to coalesce into more partitions than the original RDD, it should just
     // keep the original number of partitions.
-    val coalesced4 = new CoalescedRDD(data, 20)
+    val coalesced4 = data.coalesce(20)
     assert(coalesced4.collect().toList === (1 to 10).toList)
     assert(coalesced4.glom().collect().map(_.toList).toList ===
       (1 to 10).map(x => List(x)).toList)