From f0b68c623c116540470e06967c1554855d16a500 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Tue, 5 Feb 2013 19:02:46 -0800
Subject: [PATCH] Initial cut at replacing K, V in Java files

---
 core/src/test/scala/spark/JavaAPISuite.java   | 24 ++++++++
 .../streaming/api/java/JavaDStreamLike.scala  |  4 +-
 .../java/spark/streaming/JavaAPISuite.java    | 56 +++++++++++++++++++
 3 files changed, 82 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 934e4c2f67..9ffe7c5f99 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
     Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
   }
+
+  @Test
+  public void mapOnPairRDD() {
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+        return new Tuple2<Integer, Integer>(i, i % 2);
+      }
+    });
+    JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+        new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+      @Override
+      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+        return new Tuple2<Integer, Integer>(in._2(), in._1());
+      }
+    });
+    Assert.assertEquals(Arrays.asList(
+        new Tuple2<Integer, Integer>(1, 1),
+        new Tuple2<Integer, Integer>(0, 2),
+        new Tuple2<Integer, Integer>(1, 3),
+        new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+  }
 }
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..39fe0d0ccc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
   }
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
-    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
     new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
   }
 
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 79d6093429..26ac82b71a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -506,6 +506,62 @@ public class JavaAPISuite implements Serializable {
           new Tuple2<String, Integer>("new york", 3),
           new Tuple2<String, Integer>("new york", 1)));
 
+  @Test
+  public void testPairMap() { // Maps pair -> pair
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+            Arrays.asList(
+                    new Tuple2<Integer, String>(1, "california"),
+                    new Tuple2<Integer, String>(3, "california"),
+                    new Tuple2<Integer, String>(4, "new york"),
+                    new Tuple2<Integer, String>(1, "new york")),
+            Arrays.asList(
+                    new Tuple2<Integer, String>(5, "california"),
+                    new Tuple2<Integer, String>(5, "california"),
+                    new Tuple2<Integer, String>(3, "new york"),
+                    new Tuple2<Integer, String>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.map(
+        new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+          @Override
+          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+            return new Tuple2(in._2(),  in._1());
+          }
+    });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testPairMap2() { // Maps pair -> single
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Integer>> expected = Arrays.asList(
+            Arrays.asList(1, 3, 4, 1),
+            Arrays.asList(5, 5, 3, 1));
+
+    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> reversed = pairStream.map(
+            new Function<Tuple2<String, Integer>, Integer>() {
+              @Override
+              public Integer call(Tuple2<String, Integer> in) throws Exception {
+                return in._2();
+              }
+            });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
   @Test
   public void testPairGroupByKey() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-- 
GitLab