diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 136b4da61e5cf6afe8b0ca8aea84ff4e36be19a8..9af401986d350ec4b47495638d64ccc33531034a 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -28,18 +28,18 @@ import scala.collection.JavaConversions._
   * @param mergeCombiners function to merge outputs from multiple mergeValue function.
   */
 case class Aggregator[K, V, C] (
-    val createCombiner: V => C,
-    val mergeValue: (C, V) => C,
-    val mergeCombiners: (C, C) => C) {
+    createCombiner: V => C,
+    mergeValue: (C, V) => C,
+    mergeCombiners: (C, C) => C) {
 
-  def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
     val combiners = new JHashMap[K, C]
-    for ((k, v) <- iter) {
-      val oldC = combiners.get(k)
+    for (kv <- iter) {
+      val oldC = combiners.get(kv._1)
       if (oldC == null) {
-        combiners.put(k, createCombiner(v))
+        combiners.put(kv._1, createCombiner(kv._2))
       } else {
-        combiners.put(k, mergeValue(oldC, v))
+        combiners.put(kv._1, mergeValue(oldC, kv._2))
       }
     }
     combiners.iterator
@@ -47,7 +47,7 @@ case class Aggregator[K, V, C] (
 
   def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
     val combiners = new JHashMap[K, C]
-    for ((k, c) <- iter) {
+    iter.foreach { case(k, c) =>
       val oldC = combiners.get(k)
       if (oldC == null) {
         combiners.put(k, c)
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 8f6953b1f52c5ef495e0db044daedcbd217132ef..1ec95ed9b88e049b72d30d56f06f0c386fbc6ebc 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -28,8 +28,9 @@ import spark.util.CompletionIterator
 
 private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
 
-  override def fetch[K, V](
-    shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) = {
+  override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
+    : Iterator[T] =
+  {
 
     logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
     val blockManager = SparkEnv.get.blockManager
@@ -49,12 +50,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
         (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
     }
 
-    def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
+    def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
       val blockId = blockPair._1
       val blockOption = blockPair._2
       blockOption match {
         case Some(block) => {
-          block.asInstanceOf[Iterator[(K, V)]]
+          block.asInstanceOf[Iterator[T]]
         }
         case None => {
           val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
@@ -73,7 +74,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
     val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
     val itr = blockFetcherItr.flatMap(unpackBlock)
 
-    CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
+    CompletionIterator[T, Iterator[T]](itr, {
       val shuffleMetrics = new ShuffleReadMetrics
       shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
       shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index b1edaa06f80733af95341d82ce9ee0bd87ab2efc..d5a960657095634151ddb3e4513b934212a0e408 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -44,10 +44,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
  * @param serializerClass class name of the serializer to use
  */
 class ShuffleDependency[K, V](
-    @transient rdd: RDD[(K, V)],
+    @transient rdd: RDD[_ <: Product2[K, V]],
     val partitioner: Partitioner,
     val serializerClass: String = null)
-  extends Dependency(rdd) {
+  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
 
   val shuffleId: Int = rdd.context.newShuffleId()
 }
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0be4b4feb8c360e9812c0d7d7db118f713ab8dcf..e7d4a7f562ce5e1df95ad9238f7f0360fc7088f1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,9 +21,8 @@ import java.nio.ByteBuffer
 import java.util.{Date, HashMap => JHashMap}
 import java.text.SimpleDateFormat
 
-import scala.collection.Map
+import scala.collection.{mutable, Map}
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.OutputFormat
 
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
 
 import spark.partial.BoundedDouble
 import spark.partial.PartialResult
@@ -50,8 +48,7 @@ import spark.Partitioner._
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
  * Import `spark.SparkContext._` at the top of your program to use these functions.
  */
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
-    self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
   extends Logging
   with HadoopMapReduceUtil
   with Serializable {
@@ -85,18 +82,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     }
     val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
     if (self.partitioner == Some(partitioner)) {
-      self.mapPartitions(aggregator.combineValuesByKey, true)
+      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
     } else if (mapSideCombine) {
-      val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
-      val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
         .setSerializer(serializerClass)
-      partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
+      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
     } else {
       // Don't apply map-side combiner.
       // A sanity check to make sure mergeCombiners is not defined.
       assert(mergeCombiners == null)
-      val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
-      values.mapPartitions(aggregator.combineValuesByKey, true)
+      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
     }
   }
 
@@ -168,7 +165,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
 
     def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
       val map = new JHashMap[K, V]
-      for ((k, v) <- iter) {
+      iter.foreach { case (k, v) =>
         val old = map.get(k)
         map.put(k, if (old == null) v else func(old, v))
       }
@@ -176,11 +173,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     }
 
     def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
-      for ((k, v) <- m2) {
+      m2.foreach { case (k, v) =>
         val old = m1.get(k)
         m1.put(k, if (old == null) v else func(old, v))
       }
-      return m1
+      m1
     }
 
     self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -240,7 +237,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
-    new ShuffledRDD[K, V](self, partitioner)
+    new ShuffledRDD[K, V, (K, V)](self, partitioner)
   }
 
   /**
@@ -249,9 +246,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
    * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
    */
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
-    this.cogroup(other, partitioner).flatMapValues {
-      case (vs, ws) =>
-        for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
     }
   }
 
@@ -262,13 +258,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
    * partition the output RDD.
    */
   def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
-    this.cogroup(other, partitioner).flatMapValues {
-      case (vs, ws) =>
-        if (ws.isEmpty) {
-          vs.iterator.map(v => (v, None))
-        } else {
-          for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
-        }
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (ws.isEmpty) {
+        vs.iterator.map(v => (v, None))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+      }
     }
   }
 
@@ -280,13 +275,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
    */
   def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Option[V], W))] = {
-    this.cogroup(other, partitioner).flatMapValues {
-      case (vs, ws) =>
-        if (vs.isEmpty) {
-          ws.iterator.map(w => (None, w))
-        } else {
-          for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
-        }
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (vs.isEmpty) {
+        ws.iterator.map(w => (None, w))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+      }
     }
   }
 
@@ -378,7 +372,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   /**
    * Return the key-value pairs in this RDD to the master as a Map.
    */
-  def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+  def collectAsMap(): Map[K, V] = {
+    val data = self.toArray()
+    val map = new mutable.HashMap[K, V]
+    map.sizeHint(data.length)
+    data.foreach { case (k, v) => map.put(k, v) }
+    map
+  }
 
   /**
    * Pass each value in the key-value pair RDD through a map function without changing the keys;
@@ -406,13 +406,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
-    val cg = new CoGroupedRDD[K](
-        Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
-        partitioner)
+    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues {
-      case Seq(vs, ws) =>
-        (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+    prfs.mapValues { case Seq(vs, ws) =>
+      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
     }
   }
 
@@ -425,15 +422,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
-    val cg = new CoGroupedRDD[K](
-        Seq(self.asInstanceOf[RDD[(K, _)]],
-            other1.asInstanceOf[RDD[(K, _)]],
-            other2.asInstanceOf[RDD[(K, _)]]),
-        partitioner)
+    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues {
-      case Seq(vs, w1s, w2s) =>
-        (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+    prfs.mapValues { case Seq(vs, w1s, w2s) =>
+      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
     }
   }
 
@@ -664,7 +656,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     val writer = new HadoopWriter(conf)
     writer.preSetup()
 
-    def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
+    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -703,54 +695,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
 }
 
-/**
- * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
- */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
-  self: RDD[(K, V)])
-  extends Logging
-  with Serializable {
-
-  /**
-   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
-   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
-   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
-   * order of the keys).
-   */
-  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
-    val shuffled =
-      new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
-    shuffled.mapPartitions(iter => {
-      val buf = iter.toArray
-      if (ascending) {
-        buf.sortWith((x, y) => x._1 < y._1).iterator
-      } else {
-        buf.sortWith((x, y) => x._1 > y._1).iterator
-      }
-    }, true)
-  }
-}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
-  override def getPartitions = firstParent[(K, V)].partitions
-  override val partitioner = firstParent[(K, V)].partitioner
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
-}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
-  extends RDD[(K, U)](prev) {
-
-  override def getPartitions = firstParent[(K, V)].partitions
-  override val partitioner = firstParent[(K, V)].partitioner
-  override def compute(split: Partition, context: TaskContext) = {
-    firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
-  }
-}
 
 private[spark] object Manifests {
   val seqSeqManifest = classManifest[Seq[Seq[_]]]
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 6035bc075ef286bf8226143c83f442f921b91f3a..65da8235d75085bdca8ce20f6c40a537990fd203 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -84,7 +84,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
  */
 class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
     partitions: Int,
-    @transient rdd: RDD[(K,V)],
+    @transient rdd: RDD[_ <: Product2[K,V]],
     private val ascending: Boolean = true) 
   extends Partitioner {
 
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 503ea6ccbf47bbbe47ad66075e2171f66467aa09..c9a044afabb91445385d711572094ea64d16f586 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,9 +31,8 @@ import org.apache.hadoop.mapred.TextOutputFormat
 
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
 
-import spark.api.java.JavaRDD
-import spark.broadcast.Broadcast
 import spark.Partitioner._
+import spark.api.java.JavaRDD
 import spark.partial.BoundedDouble
 import spark.partial.CountEvaluator
 import spark.partial.GroupedCountEvaluator
@@ -287,7 +286,10 @@ abstract class RDD[T: ClassManifest](
   def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
     if (shuffle) {
       // include a shuffle step so that our upstream tasks are still distributed
-      new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+      new CoalescedRDD(
+        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+        new HashPartitioner(numPartitions)),
+        numPartitions).keys
     } else {
       new CoalescedRDD(this, numPartitions)
     }
@@ -302,8 +304,8 @@ abstract class RDD[T: ClassManifest](
   def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
     var fraction = 0.0
     var total = 0
-    var multiplier = 3.0
-    var initialCount = this.count()
+    val multiplier = 3.0
+    val initialCount = this.count()
     var maxSelected = 0
 
     if (num < 0) {
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index dcced035e74d4a8d407b4303a4f7e3f2f5231a62..a6839cf7a40dc94b6cd49010acee7f8d259c5f5c 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -22,12 +22,13 @@ import spark.serializer.Serializer
 
 
 private[spark] abstract class ShuffleFetcher {
+
   /**
    * Fetch the shuffle outputs for a given ShuffleDependency.
    * @return An iterator over the elements of the fetched shuffle outputs.
    */
-  def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
-    serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[(K,V)]
+  def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
+      serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
 
   /** Stop the fetcher */
   def stop() {}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 80c65dfebd2ad1dd872c919ed9104bb91ed32d52..185c76366fcc85f134316c7e50763182e1cd76d6 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -20,19 +20,14 @@ package spark
 import java.io._
 import java.net.URI
 import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
 import scala.util.DynamicVariable
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-
-import akka.actor.Actor._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -54,23 +49,22 @@ import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.mesos.MesosNativeLibrary
 
 import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
+import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
+  OrderedRDDFunctions}
 import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
-  SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+  SplitInfo, Stage, StageInfo, TaskScheduler}
 import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
   ClusterScheduler, Schedulable, SchedulingMode}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import spark.ui.SparkUI
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI}
-import spark.metrics._
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -833,11 +827,11 @@ class SparkContext(
   /** Default min number of partitions for Hadoop RDDs when not given by user */
   def defaultMinSplits: Int = math.min(defaultParallelism, 2)
 
-  private var nextShuffleId = new AtomicInteger(0)
+  private val nextShuffleId = new AtomicInteger(0)
 
   private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
 
-  private var nextRddId = new AtomicInteger(0)
+  private val nextRddId = new AtomicInteger(0)
 
   /** Register a new RDD, returning its RDD ID */
   private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
@@ -886,7 +880,7 @@ object SparkContext {
 
   implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
       rdd: RDD[(K, V)]) =
-    new OrderedRDDFunctions(rdd)
+    new OrderedRDDFunctions[K, V, (K, V)](rdd)
 
   implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
 
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c2995b836a14a2d812e0f4dbecfaf193b4d5cadd..effe6e5e0d6cd6d9626192f07e2d811e055412f9 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -30,17 +30,18 @@ import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 import org.apache.hadoop.conf.Configuration
 
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.OrderedRDDFunctions
-import spark.storage.StorageLevel
 import spark.HashPartitioner
 import spark.Partitioner
 import spark.Partitioner._
 import spark.RDD
 import spark.SparkContext.rddToPairRDDFunctions
+import spark.api.java.function.{Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction}
+import spark.partial.BoundedDouble
+import spark.partial.PartialResult
+import spark.rdd.OrderedRDDFunctions
+import spark.storage.StorageLevel
+
 
 class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
   implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
@@ -563,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
       override def compare(b: K) = comp.compare(a, b)
     }
     implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
-    fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
+    fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
   }
 
   /**
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c2d95dc06049f8b603abbf9373422dfc4739c882..01b6c23dcc202a5b701dbbf34d97aa529a0eaeec 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -60,7 +60,7 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
  * @param rdds parent RDDs.
  * @param part partitioner used to partition the shuffle output.
  */
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
   private var serializerClass: String = null
@@ -71,13 +71,13 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
   }
 
   override def getDependencies: Seq[Dependency[_]] = {
-    rdds.map { rdd: RDD[(K, _)] =>
+    rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
       if (rdd.partitioner == Some(part)) {
-        logInfo("Adding one-to-one dependency with " + rdd)
+        logDebug("Adding one-to-one dependency with " + rdd)
         new OneToOneDependency(rdd)
       } else {
-        logInfo("Adding shuffle dependency with " + rdd)
-        new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
+        logDebug("Adding shuffle dependency with " + rdd)
+        new ShuffleDependency[Any, Any](rdd, part, serializerClass)
       }
     }
   }
@@ -122,15 +122,15 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
-        for ((k, v) <- rdd.iterator(itsSplit, context)) {
-          getSeq(k.asInstanceOf[K])(depNum) += v
+        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
+          getSeq(kv._1)(depNum) += kv._2
         }
       }
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
-        fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
-          case (key, value) => getSeq(key)(depNum) += value
+        fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
+          kv => getSeq(kv._1)(depNum) += kv._2
         }
       }
     }
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a6bdce89d8a4ce317107e31da8d11ba8b14cfb2d
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, V]].partitions
+
+  override val partitioner = firstParent[Product2[K, V]].partitioner
+
+  override def compute(split: Partition, context: TaskContext) = {
+    firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
+      f(v).map(x => (k, x))
+    }
+  }
+}
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8334e3b557dd22f8deb597b8e11c775a0d4dee63
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+
+import spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, U]].partitions
+
+  override val partitioner = firstParent[Product2[K, U]].partitioner
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+    firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+  }
+}
diff --git a/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9154b760359aa1836d214219f216a136dfc25b09
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
+                          V: ClassManifest,
+                          P <: Product2[K, V] : ClassManifest](
+    self: RDD[P])
+  extends Logging with Serializable {
+
+  /**
+   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+   * order of the keys).
+   */
+  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
+    val part = new RangePartitioner(numPartitions, self, ascending)
+    val shuffled = new ShuffledRDD[K, V, P](self, part)
+    shuffled.mapPartitions(iter => {
+      val buf = iter.toArray
+      if (ascending) {
+        buf.sortWith((x, y) => x._1 < y._1).iterator
+      } else {
+        buf.sortWith((x, y) => x._1 > y._1).iterator
+      }
+    }, preservesPartitioning = true)
+  }
+}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index bcf7d0d89c4451a1cb15457316a1fdb91fa8a788..51c05af064ef7456c0020d0f78a68ebd31a9b069 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,9 +17,7 @@
 
 package spark.rdd
 
-import spark._
-import scala.Some
-import scala.Some
+import spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
 
 
 private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -34,14 +32,14 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  * @tparam K the key class.
  * @tparam V the value class.
  */
-class ShuffledRDD[K, V](
-    @transient var prev: RDD[(K, V)],
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+    @transient var prev: RDD[P],
     part: Partitioner)
-  extends RDD[(K, V)](prev.context, Nil) {
+  extends RDD[P](prev.context, Nil) {
 
   private var serializerClass: String = null
 
-  def setSerializer(cls: String): ShuffledRDD[K, V] = {
+  def setSerializer(cls: String): ShuffledRDD[K, V, P] = {
     serializerClass = cls
     this
   }
@@ -56,9 +54,9 @@ class ShuffledRDD[K, V](
     Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
     val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
-    SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
+    SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
       SparkEnv.get.serializerManager.get(serializerClass))
   }
 
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 46b8cafaac3396f2dcf39f87e921b3de19001ca2..dadef5e17de594431ca70002438b025448248cfc 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -47,8 +47,8 @@ import spark.OneToOneDependency
  * out of memory because of the size of `rdd2`.
  */
 private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
-    @transient var rdd1: RDD[(K, V)],
-    @transient var rdd2: RDD[(K, W)],
+    @transient var rdd1: RDD[_ <: Product2[K, V]],
+    @transient var rdd2: RDD[_ <: Product2[K, W]],
     part: Partitioner)
   extends RDD[(K, V)](rdd1.context, Nil) {
 
@@ -62,11 +62,11 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
   override def getDependencies: Seq[Dependency[_]] = {
     Seq(rdd1, rdd2).map { rdd =>
       if (rdd.partitioner == Some(part)) {
-        logInfo("Adding one-to-one dependency with " + rdd)
+        logDebug("Adding one-to-one dependency with " + rdd)
         new OneToOneDependency(rdd)
       } else {
-        logInfo("Adding shuffle dependency with " + rdd)
-        new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass)
+        logDebug("Adding shuffle dependency with " + rdd)
+        new ShuffleDependency(rdd, part, serializerClass)
       }
     }
   }
@@ -103,16 +103,14 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
         seq
       }
     }
-    def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match {
+    def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
-        for (t <- rdd.iterator(itsSplit, context))
-          op(t.asInstanceOf[(K, V)])
+        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
       }
       case ShuffleCoGroupSplitDep(shuffleId) => {
-        val iter = SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index,
+        val iter = SparkEnv.get.shuffleFetcher.fetch[Product2[K, V]](shuffleId, partition.index,
           context.taskMetrics, serializer)
-        for (t <- iter)
-          op(t.asInstanceOf[(K, V)])
+        iter.foreach(op)
       }
     }
     // the first dep is rdd1; add all values to the map
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index e3bb6d1e609950acc1c5a24831115fe7ab193244..121ff31121d3ab6a1191fbe89f1b3ecac47d177d 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -148,7 +148,7 @@ private[spark] class ShuffleMapTask(
 
       // Write the map output to its associated buckets.
       for (elem <- rdd.iterator(split, taskContext)) {
-        val pair = elem.asInstanceOf[(Any, Any)]
+        val pair = elem.asInstanceOf[Product2[Any, Any]]
         val bucketId = dep.partitioner.getPartition(pair._1)
         buckets.writers(bucketId).write(pair)
       }
diff --git a/core/src/main/scala/spark/util/MutablePair.scala b/core/src/main/scala/spark/util/MutablePair.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3063806e83e82f39bbcac9559068ae7d65da20b9
--- /dev/null
+++ b/core/src/main/scala/spark/util/MutablePair.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+
+/**
+ * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
+ * minimize object allocation.
+ *
+ * @param  _1   Element 1 of this MutablePair
+ * @param  _2   Element 2 of this MutablePair
+ */
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
+                      @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+  (var _1: T1, var _2: T2)
+  extends Product2[T1, T2]
+{
+  override def toString = "(" + _1 + "," + _2 + ")"
+
+  override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[T1, T2]]
+}
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index a84c89e3c9517605f726631523faf98a5a6974ad..966dede2be12a8ba626f49c69320e76057bb59d6 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   test("ShuffledRDD") {
     testCheckpointing(rdd => {
       // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
-      new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
+      new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
     })
   }
 
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
index b102eaf4e65bcfdc03e7c479d725e6378fd18ade..328b3b5497ea884f4c9039c672f24398b4ca2fd5 100644
--- a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashSet
 
 import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
 
 import com.google.common.io.Files
-
-import spark.rdd.ShuffledRDD
 import spark.SparkContext._
 
+
 class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   test("groupByKey") {
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cbddf4e523d16a1872c3f93d61afdcadcdb13dbe..75778de1cc916dd52135d343def2951613ed2fbe 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -170,7 +170,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 
     // we can optionally shuffle to keep the upstream parallel
     val coalesced5 = data.coalesce(1, shuffle = true)
-    assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+    assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
       null)
   }
 
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index c686b8cc5af43a04dddad6d9256b9339b947d0e8..8745689c70bca55f8b11a6579cfdbc0ac9e00e2a 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -20,8 +20,11 @@ package spark
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import spark.rdd.ShuffledRDD
 import spark.SparkContext._
+import spark.ShuffleSuite.NonJavaSerializableClass
+import spark.rdd.{SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
+import spark.util.MutablePair
+
 
 class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
   test("groupByKey without compression") {
@@ -46,12 +49,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
 
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
-      (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+      (x, new NonJavaSerializableClass(x * 2))
     }
     // If the Kryo serializer is not used correctly, the shuffle would fail because the
     // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS))
-      .setSerializer(classOf[spark.KryoSerializer].getName)
+    val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+      b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[spark.KryoSerializer].getName)
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
 
     assert(c.count === 10)
@@ -68,12 +71,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     sc = new SparkContext("local-cluster[2,1,512]", "test")
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
-      (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+      (x, new NonJavaSerializableClass(x * 2))
     }
     // If the Kryo serializer is not used correctly, the shuffle would fail because the
     // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD(b, new HashPartitioner(3))
-      .setSerializer(classOf[spark.KryoSerializer].getName)
+    val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+      b, new HashPartitioner(3)).setSerializer(classOf[spark.KryoSerializer].getName)
     assert(c.count === 10)
   }
 
@@ -88,7 +91,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
 
     // NOTE: The default Java serializer doesn't create zero-sized blocks.
     //       So, use Kryo
-    val c = new ShuffledRDD(b, new HashPartitioner(10))
+    val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
       .setSerializer(classOf[spark.KryoSerializer].getName)
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
@@ -114,7 +117,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     val b = a.map(x => (x, x*2))
 
     // NOTE: The default Java serializer should create zero-sized blocks
-    val c = new ShuffledRDD(b, new HashPartitioner(10))
+    val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
 
     val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
     assert(c.count === 4)
@@ -128,6 +131,72 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     // We should have at most 4 non-zero sized partitions
     assert(nonEmptyBlocks.size <= 4)
   }
+
+  test("shuffle using mutable pairs") {
+    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+    val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+    val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
+      .collect()
+
+    data.foreach { pair => results should contain (pair) }
+  }
+
+  test("sorting using mutable pairs") {
+    // This is not in SortingSuite because of the local cluster setup.
+    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
+    val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+    val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
+      .sortByKey().collect()
+    results(0) should be (p(1, 11))
+    results(1) should be (p(2, 22))
+    results(2) should be (p(3, 33))
+    results(3) should be (p(100, 100))
+  }
+
+  test("cogroup using mutable pairs") {
+    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+    val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
+    val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+    val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+
+    assert(results(1)(0).length === 3)
+    assert(results(1)(0).contains(1))
+    assert(results(1)(0).contains(2))
+    assert(results(1)(0).contains(3))
+    assert(results(1)(1).length === 2)
+    assert(results(1)(1).contains("11"))
+    assert(results(1)(1).contains("12"))
+    assert(results(2)(0).length === 1)
+    assert(results(2)(0).contains(1))
+    assert(results(2)(1).length === 1)
+    assert(results(2)(1).contains("22"))
+    assert(results(3)(0).length === 0)
+    assert(results(3)(1).contains("3"))
+  }
+
+  test("subtract mutable pairs") {
+    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
+    val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
+    val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+    val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+    val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
+    results should have length (1)
+    // substracted rdd return results as Tuple2
+    results(0) should be ((3, 33))
+  }
 }
 
 object ShuffleSuite {
diff --git a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
index 30fded12f0278eeb74b9c75d1a7da0e3bbf306db..f45d0b281cf0a1a38def95993c6c6569f00abc42 100644
--- a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
@@ -17,13 +17,15 @@
 
 package spark.tools
 
-import spark._
 import java.lang.reflect.Method
+
 import scala.collection.mutable.ArrayBuffer
+
+import spark._
 import spark.api.java._
+import spark.rdd.OrderedRDDFunctions
 import spark.streaming.{PairDStreamFunctions, DStream, StreamingContext}
 import spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
-import scala.Tuple2
 
 
 private[spark] abstract class SparkType(val name: String)
@@ -336,7 +338,7 @@ object JavaAPICompletenessChecker {
     println()
 
     println("Missing OrderedRDD methods")
-    printMissingMethods(classOf[OrderedRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]])
+    printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]])
     println()
 
     println("Missing SparkContext methods")