diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 5163c8013475d73a6e25efedf9b70cc0baaeeec9..3b9ced1946b25b14542844936e675b5c8e6faa76 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -517,6 +517,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
       .saveAsSequenceFile(path)
   }
 
+  /**
+   * Creates tuples of the elements in this RDD by applying `f`.
+   */
+  def keyBy[K](f: T => K): RDD[(K, T)] = {
+    map(x => (f(x), x))
+  }
+
   /** A private method for tests, to look at the contents of each partition */
   private[spark] def collectPartitions(): Array[Array[T]] = {
     sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 81d3a944668d007af6e84da90147ed95b499e153..d15f6dd02f5acfe30827355a2d23e52bcc8adab8 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -298,4 +298,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Save this RDD as a SequenceFile of serialized objects.
    */
   def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
+
+  /**
+   * Creates tuples of the elements in this RDD by applying `f`.
+   */
+  def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
+    implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+    JavaPairRDD.fromRDD(rdd.keyBy(f))
+  }
 }
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 0817d1146c4b05772701450ada951eef92e089e2..c61913fc82797e1ede6bd46f47a83ba50af73e7c 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -629,4 +629,16 @@ public class JavaAPISuite implements Serializable {
     floatAccum.setValue(5.0f);
     Assert.assertEquals((Float) 5.0f, floatAccum.value());
   }
+
+  @Test
+  public void keyBy() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+    List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
+      public String call(Integer t) throws Exception {
+        return t.toString();
+      }
+    }).collect();
+    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+  }
 }
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 872b06fd080a96305b951525e934ca77542a26ba..d74e9786c3cfcc190d9f713e53c63a174c97226f 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -36,6 +36,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
     assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
     assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
     assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
+    assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
     val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
     assert(partitionSums.collect().toList === List(3, 7))