diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 6b0cc2fbf15e6787d330cefa74bf72ed5fd9bd5d..0be4b4feb8c360e9812c0d7d7db118f713ab8dcf 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -85,17 +85,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     }
     val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
     if (self.partitioner == Some(partitioner)) {
-      self.mapPartitions(aggregator.combineValuesByKey(_), true)
+      self.mapPartitions(aggregator.combineValuesByKey, true)
     } else if (mapSideCombine) {
-      val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
-      val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
-      partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
+      val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
+      val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+        .setSerializer(serializerClass)
+      partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
     } else {
       // Don't apply map-side combiner.
       // A sanity check to make sure mergeCombiners is not defined.
       assert(mergeCombiners == null)
-      val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
-      values.mapPartitions(aggregator.combineValuesByKey(_), true)
+      val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
+      values.mapPartitions(aggregator.combineValuesByKey, true)
     }
   }
 
@@ -233,31 +234,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   }
 
   /**
-   * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
-   * is true, Spark will group values of the same key together on the map side before the
-   * repartitioning, to only send each key over the network once. If a large number of
-   * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
-   * be set to true.
+   * Return a copy of the RDD partitioned using the specified partitioner.
    */
-  def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
-    if (getKeyClass().isArray) {
-      if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array keys.")
-      }
-      if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("Default partitioner cannot partition array keys.")
-      }
-    }
-    if (mapSideCombine) {
-      def createCombiner(v: V) = ArrayBuffer(v)
-      def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-      def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
-      val bufs = combineByKey[ArrayBuffer[V]](
-        createCombiner _, mergeValue _, mergeCombiners _, partitioner)
-      bufs.flatMapValues(buf => buf)
-    } else {
-      new ShuffledRDD[K, V](self, partitioner)
+  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
     }
+    new ShuffledRDD[K, V](self, partitioner)
   }
 
   /**
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index ff12e8b76cfdbf9d339e7f4458744e038b3b3df2..c2995b836a14a2d812e0f4dbecfaf193b4d5cadd 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -253,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
     fromRDD(rdd.subtract(other, p))
 
   /**
-   * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
-   * is true, Spark will group values of the same key together on the map side before the
-   * repartitioning, to only send each key over the network once. If a large number of
-   * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
-   * be set to true.
+   * Return a copy of the RDD partitioned using the specified partitioner.
    */
   def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
     fromRDD(rdd.partitionBy(partitioner))
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c540cd36eb994d46aae3a448601fecb5630e723a..c2d95dc06049f8b603abbf9373422dfc4739c882 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConversions
 import scala.collection.mutable.ArrayBuffer
 
-import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
 import spark.{Dependency, OneToOneDependency, ShuffleDependency}
 
 
@@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
   override def hashCode(): Int = idx
 }
 
-private[spark] class CoGroupAggregator
-  extends Aggregator[Any, Any, ArrayBuffer[Any]](
-    { x => ArrayBuffer(x) },
-    { (b, x) => b += x },
-    { (b1, b2) => b1 ++ b2 })
-  with Serializable
-
 
 /**
  * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@@ -66,34 +59,25 @@ private[spark] class CoGroupAggregator
  *
  * @param rdds parent RDDs.
  * @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
- *                       is on, Spark does an extra pass over the data on the map side to merge
- *                       all values belonging to the same key together. This can reduce the amount
- *                       of data shuffled if and only if the number of distinct keys is very small,
- *                       and the ratio of key size to value size is also very small.
  */
-class CoGroupedRDD[K](
-  @transient var rdds: Seq[RDD[(K, _)]],
-  part: Partitioner,
-  val mapSideCombine: Boolean = false,
-  val serializerClass: String = null)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
-  private val aggr = new CoGroupAggregator
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): CoGroupedRDD[K] = {
+    serializerClass = cls
+    this
+  }
 
   override def getDependencies: Seq[Dependency[_]] = {
-    rdds.map { rdd =>
+    rdds.map { rdd: RDD[(K, _)] =>
       if (rdd.partitioner == Some(part)) {
         logInfo("Adding one-to-one dependency with " + rdd)
         new OneToOneDependency(rdd)
       } else {
         logInfo("Adding shuffle dependency with " + rdd)
-        if (mapSideCombine) {
-          val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
-          new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
-        } else {
-          new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
-        }
+        new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
       }
     }
   }
@@ -145,16 +129,8 @@ class CoGroupedRDD[K](
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
-        if (mapSideCombine) {
-          // With map side combine on, for each key, the shuffle fetcher returns a list of values.
-          fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
-            case (key, values) => getSeq(key)(depNum) ++= values
-          }
-        } else {
-          // With map side combine off, for each key the shuffle fetcher returns a single value.
-          fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
-            case (key, value) => getSeq(key)(depNum) += value
-          }
+        fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
+          case (key, value) => getSeq(key)(depNum) += value
         }
       }
     }
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 0137f80953e02a14e537f34539b4f6710682c4f8..bcf7d0d89c4451a1cb15457316a1fdb91fa8a788 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,8 +17,9 @@
 
 package spark.rdd
 
-import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
-import spark.SparkContext._
+import spark._
+import scala.Some
+import scala.Some
 
 
 private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -30,15 +31,24 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  * The resulting RDD from a shuffle (e.g. repartitioning of data).
  * @param prev the parent RDD.
  * @param part the partitioner used to partition the RDD
- * @param serializerClass class name of the serializer to use.
  * @tparam K the key class.
  * @tparam V the value class.
  */
 class ShuffledRDD[K, V](
-    @transient prev: RDD[(K, V)],
-    part: Partitioner,
-    serializerClass: String = null)
-  extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) {
+    @transient var prev: RDD[(K, V)],
+    part: Partitioner)
+  extends RDD[(K, V)](prev.context, Nil) {
+
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): ShuffledRDD[K, V] = {
+    serializerClass = cls
+    this
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    List(new ShuffleDependency(prev, part, serializerClass))
+  }
 
   override val partitioner = Some(part)
 
@@ -51,4 +61,9 @@ class ShuffledRDD[K, V](
     SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
       SparkEnv.get.serializerManager.get(serializerClass))
   }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
 }
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0402b9f2501d62e2fa9e49aa096ee193926519be..46b8cafaac3396f2dcf39f87e921b3de19001ca2 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -49,10 +49,16 @@ import spark.OneToOneDependency
 private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
     @transient var rdd1: RDD[(K, V)],
     @transient var rdd2: RDD[(K, W)],
-    part: Partitioner,
-    val serializerClass: String = null)
+    part: Partitioner)
   extends RDD[(K, V)](rdd1.context, Nil) {
 
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
+    serializerClass = cls
+    this
+  }
+
   override def getDependencies: Seq[Dependency[_]] = {
     Seq(rdd1, rdd2).map { rdd =>
       if (rdd.partitioner == Some(part)) {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 752e4b85e67d2585f128a9e76b754aa49cafee1a..c686b8cc5af43a04dddad6d9256b9339b947d0e8 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -17,17 +17,8 @@
 
 package spark
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-import com.google.common.io.Files
 
 import spark.rdd.ShuffledRDD
 import spark.SparkContext._
@@ -59,8 +50,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     }
     // If the Kryo serializer is not used correctly, the shuffle would fail because the
     // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
-      classOf[spark.KryoSerializer].getName)
+    val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS))
+      .setSerializer(classOf[spark.KryoSerializer].getName)
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
 
     assert(c.count === 10)
@@ -81,7 +72,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     }
     // If the Kryo serializer is not used correctly, the shuffle would fail because the
     // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
+    val c = new ShuffledRDD(b, new HashPartitioner(3))
+      .setSerializer(classOf[spark.KryoSerializer].getName)
     assert(c.count === 10)
   }
 
@@ -96,7 +88,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
 
     // NOTE: The default Java serializer doesn't create zero-sized blocks.
     //       So, use Kryo
-    val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
+    val c = new ShuffledRDD(b, new HashPartitioner(10))
+      .setSerializer(classOf[spark.KryoSerializer].getName)
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
     assert(c.count === 4)