diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 6516bea157233444021b16eba6cb05b8d9722907..b0daa70cfdf14f3f59ee45ade35b251a1472c5e5 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -9,9 +9,9 @@ package spark
   *                       known as map-side aggregations. When set to false, 
   *                       mergeCombiners function is not used.
   */
-class Aggregator[K, V, C] (
+case class Aggregator[K, V, C] (
     val createCombiner: V => C,
     val mergeValue: (C, V) => C,
     val mergeCombiners: (C, C) => C,
     val mapSideCombine: Boolean = true)
-  extends Serializable
+
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 64018f8c6bf1efaee6e5585fc6aae9c1c46dbc53..aa1d00c63c3f25b611236602e0867040b1bd9c23 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,11 +1,10 @@
 package spark
 
 import java.io.EOFException
-import java.net.URL
 import java.io.ObjectInputStream
+import java.net.URL
+import java.util.{Date, HashMap => JHashMap}
 import java.util.concurrent.atomic.AtomicLong
-import java.util.{HashMap => JHashMap}
-import java.util.Date
 import java.text.SimpleDateFormat
 
 import scala.collection.Map
@@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def combineByKey[C](createCombiner: V => C,
       mergeValue: (C, V) => C,
       mergeCombiners: (C, C) => C,
-      partitioner: Partitioner): RDD[(K, C)] = {
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
-    new ShuffledRDD(self, aggregator, partitioner)
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true): RDD[(K, C)] = {
+    val aggregator =
+      if (mapSideCombine) {
+        new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+      } else {
+        // Don't apply map-side combiner.
+        // A sanity check to make sure mergeCombiners is not defined.
+        assert(mergeCombiners == null)
+        new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
+      }
+    new ShuffledAggregatedRDD(self, aggregator, partitioner)
   }
 
   def combineByKey[C](createCombiner: V => C,
@@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
     combineByKey[V]((v: V) => v, func, func, partitioner)
   }
-  
+
   def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
     def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
       val map = new JHashMap[K, V]
@@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     groupByKey(new HashPartitioner(numSplits))
   }
 
-  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
-    val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, mergeCombiners _, partitioner)
-    bufs.flatMapValues(buf => buf)
+  /**
+   * Repartition the RDD using the specified partitioner. If mapSideCombine is
+   * true, Spark will group values of the same key together on the map side
+   * before the repartitioning. If a large number of duplicated keys are
+   * expected, and the size of the keys are large, mapSideCombine should be set
+   * to true.
+   */
+  def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
+    if (mapSideCombine) {
+      def createCombiner(v: V) = ArrayBuffer(v)
+      def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+      def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+      val bufs = combineByKey[ArrayBuffer[V]](
+        createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+      bufs.flatMapValues(buf => buf)
+    } else {
+      new RepartitionShuffledRDD(self, partitioner)
+    }
   }
 
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
@@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   }
 
   def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
-  
+
   def mapValues[U](f: V => U): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new MappedValuesRDD(self, cleanF)
   }
-  
+
   def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new FlatMappedValuesRDD(self, cleanF)
   }
-  
+
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
     val cg = new CoGroupedRDD[K](
         Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
@@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
         (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
     }
   }
-  
+
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
       : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
     val cg = new CoGroupedRDD[K](
         Seq(self.asInstanceOf[RDD[(_, _)]],
-            other1.asInstanceOf[RDD[(_, _)]], 
+            other1.asInstanceOf[RDD[(_, _)]],
             other2.asInstanceOf[RDD[(_, _)]]),
         partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
@@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
     saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
   }
-  
+
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
     saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
   }
@@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
     saveAsHadoopDataset(conf)
   }
-  
+
   def saveAsHadoopDataset(conf: JobConf) {
     val outputFormatClass = conf.getOutputFormat
     val keyClass = conf.getOutputKeyClass
@@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     if (valueClass == null) {
       throw new SparkException("Output value class not set")
     }
-    
+
     logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
 
     val writer = new HadoopWriter(conf)
@@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
 
       writer.setup(context.stageId, context.splitId, attemptNumber)
       writer.open()
-      
+
       var count = 0
       while(iter.hasNext) {
         val record = iter.next
         count += 1
         writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
       }
-    
+
       writer.close()
       writer.commit()
     }
@@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
 
 class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
   self: RDD[(K, V)])
-  extends Logging 
+  extends Logging
   with Serializable {
 
   def sortByKey(ascending: Boolean = true): RDD[(K,V)] = {
-    val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending))
-    new SortedRDD(rangePartitionedRDD, ascending)
+    new ShuffledSortedRDD(self, ascending)
   }
 }
 
-class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
-  extends RDD[(K, V)](prev.context) {
-
-  override def splits = prev.splits
-  override val partitioner = prev.partitioner
-  override val dependencies = List(new OneToOneDependency(prev))
-
-  override def compute(split: Split) = {
-    prev.iterator(split).toArray
-      .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
-  }
-} 
- 
 class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) {
   override def splits = prev.splits
   override val dependencies = List(new OneToOneDependency(prev))
@@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
 
 class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
   extends RDD[(K, U)](prev.context) {
-  
+
   override def splits = prev.splits
   override val dependencies = List(new OneToOneDependency(prev))
   override val partitioner = prev.partitioner
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 3616d8e47ec85ffb7e819e2544ff20d29bbb9d41..a7346060b3ccb542abb5ee50a2697db7555de4ca 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -1,29 +1,89 @@
 package spark
 
+import scala.collection.mutable.ArrayBuffer
 import java.util.{HashMap => JHashMap}
 
+
 class ShuffledRDDSplit(val idx: Int) extends Split {
   override val index = idx
   override def hashCode(): Int = idx
 }
 
-class ShuffledRDD[K, V, C](
+
+/**
+ * The resulting RDD from a shuffle (e.g. repartitioning of data).
+ */
+abstract class ShuffledRDD[K, V, C](
     @transient parent: RDD[(K, V)],
     aggregator: Aggregator[K, V, C],
-    part : Partitioner) 
+    part : Partitioner)
   extends RDD[(K, C)](parent.context) {
-  //override val partitioner = Some(part)
+
   override val partitioner = Some(part)
-  
+
   @transient
   val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
 
   override def splits = splits_
-  
+
   override def preferredLocations(split: Split) = Nil
-  
+
   val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
   override val dependencies = List(dep)
+}
+
+
+/**
+ * Repartition a key-value pair RDD.
+ */
+class RepartitionShuffledRDD[K, V](
+    @transient parent: RDD[(K, V)],
+    part : Partitioner)
+  extends ShuffledRDD[K, V, V](
+    parent,
+    Aggregator[K, V, V](null, null, null, false),
+    part) {
+
+  override def compute(split: Split): Iterator[(K, V)] = {
+    val buf = new ArrayBuffer[(K, V)]
+    val fetcher = SparkEnv.get.shuffleFetcher
+    def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+    fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+    buf.iterator
+  }
+}
+
+
+/**
+ * A sort-based shuffle (that doesn't apply aggregation). It does so by first
+ * repartitioning the RDD by range, and then sort within each range.
+ */
+class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
+    @transient parent: RDD[(K, V)],
+    ascending: Boolean)
+  extends RepartitionShuffledRDD[K, V](
+    parent,
+    new RangePartitioner(parent.splits.size, parent, ascending)) {
+
+  override def compute(split: Split): Iterator[(K, V)] = {
+    // By separating this from RepartitionShuffledRDD, we avoided a
+    // buf.iterator.toArray call, thus avoiding building up the buffer twice.
+    val buf = new ArrayBuffer[(K, V)]
+    def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+    SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+    buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+  }
+}
+
+
+/**
+ * The resulting RDD from shuffle and running (hash-based) aggregation.
+ */
+class ShuffledAggregatedRDD[K, V, C](
+    @transient parent: RDD[(K, V)],
+    aggregator: Aggregator[K, V, C],
+    part : Partitioner)
+  extends ShuffledRDD[K, V, C](parent, aggregator, part) {
 
   override def compute(split: Split): Iterator[(K, C)] = {
     val combiners = new JHashMap[K, C]
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 3bcc588015d0186325b09017d61432865eb9012c..745aa0c93915b3b6836f38d9e1d364ea6bb3c277 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -44,7 +44,8 @@ object ShuffleMapTask {
   }
 
   // Since both the JarSet and FileSet have the same format this is used for both.
-  def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
+  def serializeFileSet(
+    set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
     val old = cache.get(stageId)
     if (old != null) {
       return old
@@ -59,7 +60,6 @@ object ShuffleMapTask {
     }
   }
 
-
   def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
     synchronized {
       val loader = Thread.currentThread.getContextClassLoader
@@ -113,7 +113,8 @@ class ShuffleMapTask(
     out.writeInt(bytes.length)
     out.write(bytes)
 
-    val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
+    val fileSetBytes = ShuffleMapTask.serializeFileSet(
+      fileSet, stageId, ShuffleMapTask.fileSetCache)
     out.writeInt(fileSetBytes.length)
     out.write(fileSetBytes)
     val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
@@ -172,7 +173,7 @@ class ShuffleMapTask(
         buckets.map(_.iterator)
       } else {
         // No combiners (no map-side aggregation). Simply partition the map output.
-        val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)])
+        val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
         for (elem <- rdd.iterator(split)) {
           val pair = elem.asInstanceOf[(Any, Any)]
           val bucketId = partitioner.getPartition(pair._1)
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index f622c413f7c5f8a51707f4edecb8eb4fc905e1d5..9d7e2591f1f9bd73ec0e756d01b9a8c9d4177c6f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer
 import SparkContext._
 
 class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-  
+
   var sc: SparkContext = _
-  
+
   after {
     if (sc != null) {
       sc.stop()
       sc = null
     }
   }
-  
+
   test("groupByKey") {
     sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
   }
-  
+
   test("groupByKey with many output partitions") {
     sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
       (4, (ArrayBuffer(), ArrayBuffer('w')))
     ))
   }
-  
+
   test("zero-partition RDD") {
     sc = new SparkContext("local", "test")
     val emptyDir = Files.createTempDir()
@@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     assert(file.splits.size == 0)
     assert(file.collect().toList === Nil)
     // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)    
+    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
   }
 
   test("map-side combine") {
@@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
       _+_,
       _+_,
       false)
-    val shuffledRdd = new ShuffledRDD(
+    val shuffledRdd = new ShuffledAggregatedRDD(
       pairs, aggregator, new HashPartitioner(2))
     assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
 
@@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     // not see an exception because mergeCombine should not have been called.
     val aggregatorWithException = new Aggregator[Int, Int, Int](
       (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
-    val shuffledRdd1 = new ShuffledRDD(
+    val shuffledRdd1 = new ShuffledAggregatedRDD(
       pairs, aggregatorWithException, new HashPartitioner(2))
     assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
 
@@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     // expect to see an exception thrown.
     val aggregatorWithException1 = new Aggregator[Int, Int, Int](
       (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
-    val shuffledRdd2 = new ShuffledRDD(
+    val shuffledRdd2 = new ShuffledAggregatedRDD(
       pairs, aggregatorWithException1, new HashPartitioner(2))
     evaluating { shuffledRdd2.collect() } should produce [SparkException]
   }