diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c049bd3fa9072a55b1a74140f2c7c2b736fa9e1a..5db17671460809afc9a43546967120cd05490fe9 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -20,19 +20,14 @@ package spark
 import java.io._
 import java.net.URI
 import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
 import scala.util.DynamicVariable
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-
-import akka.actor.Actor._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -54,7 +49,6 @@ import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.mesos.MesosNativeLibrary
 
@@ -63,15 +57,14 @@ import spark.partial.{ApproximateEvaluator, PartialResult}
 import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
   OrderedRDDFunctions}
 import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
-  SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+  SplitInfo, Stage, StageInfo, TaskScheduler}
 import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
   ClusterScheduler, Schedulable, SchedulingMode}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import spark.ui.SparkUI
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI}
-import spark.metrics._
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -887,7 +880,7 @@ object SparkContext {
 
   implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
       rdd: RDD[(K, V)]) =
-    new OrderedRDDFunctions(rdd.asInstanceOf[RDD[Product2[K, V]]])
+    new OrderedRDDFunctions(rdd)
 
   implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
 
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000000000000000000000000000000000000..05fdfd82c1644afbafdc57ac483b8a63700571c8
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, V]].partitions
+
+  override val partitioner = firstParent[Product2[K, V]].partitioner
+
+  override def compute(split: Partition, context: TaskContext) = {
+    firstParent[Product2[K, V]].iterator(split, context).flatMap { case (k, v) =>
+      f(v).map(x => (k, x))
+    }
+  }
+}
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000000000000000000000000000000000000..21ae97daa9366bad3effc4759d48da7e48c0ae7f
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+
+import spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, U]].partitions
+
+  override val partitioner = firstParent[Product2[K, U]].partitioner
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+    firstParent[Product2[K, V]].iterator(split, context).map { case(k ,v) => (k, f(v)) }
+  }
+}
diff --git a/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6328c6a4ac17b2c038c3578eb9ab4ed14c7561b9
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
+    self: RDD[_ <: Product2[K, V]])
+  extends Logging with Serializable {
+
+  /**
+   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+   * order of the keys).
+   */
+  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
+    : RDD[(K, V)] =
+  {
+    val part = new RangePartitioner(numPartitions, self.asInstanceOf[RDD[Product2[K,V]]], ascending)
+    val shuffled = new ShuffledRDD[K, V](self, part)
+    shuffled.mapPartitions(iter => {
+      val buf = iter.toArray
+      if (ascending) {
+        buf.sortWith((x, y) => x._1 < y._1).iterator
+      } else {
+        buf.sortWith((x, y) => x._1 > y._1).iterator
+      }
+    }, preservesPartitioning = true)
+  }
+}