diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 934e4c2f6793bda90335d561b3792080a8490f38..9ffe7c5f992b6dfe82e77d596bcc81bea164be16 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
     Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
   }
+
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+        return new Tuple2<Integer, Integer>(in._2(), in._1());
+      }
+    });
+    Assert.assertEquals(Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 1),
+        new Tuple2<Integer, Integer>(0, 2),
+        new Tuple2<Integer, Integer>(1, 3),
+        new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+  }
 }
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865aef03af96e1bb9547c99786e4980198..39fe0d0cccc474668b3bc30208a37c3f041db8c2 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
   }
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
-    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
   }
 
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 79d60934297f3814ba41fca8d90f2ea76562d44c..26ac82b71ace567872eb8cfd0ab5755ff55f1ae2 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -506,6 +506,62 @@ public class JavaAPISuite implements Serializable {
           new Tuple2<String, Integer>("new york", 3),
           new Tuple2<String, Integer>("new york", 1)));
 
+  @Test
+  public void testPairMap() { // Maps pair -> pair
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+            Arrays.asList(
+                    new Tuple2<Integer, String>(1, "california"),
+                    new Tuple2<Integer, String>(3, "california"),
+                    new Tuple2<Integer, String>(4, "new york"),
+                    new Tuple2<Integer, String>(1, "new york")),
+            Arrays.asList(
+                    new Tuple2<Integer, String>(5, "california"),
+                    new Tuple2<Integer, String>(5, "california"),
+                    new Tuple2<Integer, String>(3, "new york"),
+                    new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.map(
+        new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+          @Override
+          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+            return new Tuple2(in._2(),  in._1());
+          }
+    });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMap2() { // Maps pair -> single
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Integer>> expected = Arrays.asList(
+            Arrays.asList(1, 3, 4, 1),
+            Arrays.asList(5, 5, 3, 1));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> reversed = pairStream.map(
+            new Function<Tuple2<String, Integer>, Integer>() {
+              @Override
+              public Integer call(Tuple2<String, Integer> in) throws Exception {
+                return in._2();
+              }
+            });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
   @Test
   public void testPairGroupByKey() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;