diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 934e4c2f6793bda90335d561b3792080a8490f38..9ffe7c5f992b6dfe82e77d596bcc81bea164be16 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i % 2); + } + }); + JavaPairRDD<Integer, Integer> rdd3 = rdd2.map( + new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception { + return new Tuple2<Integer, Integer>(in._2(), in._1()); + } + }); + Assert.assertEquals(Arrays.asList( + new Tuple2<Integer, Integer>(1, 1), + new Tuple2<Integer, Integer>(0, 2), + new Tuple2<Integer, Integer>(1, 3), + new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + + } } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b93cb7865aef03af96e1bb9547c99786e4980198..ec546c81907c1e6b203b403b29a7e64496c92113 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = { + def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } @@ -100,8 +100,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) - : JavaPairDStream[K, V] = { + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) + : JavaPairDStream[K2, V2] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 79d60934297f3814ba41fca8d90f2ea76562d44c..9bfcd83e4d49784dee88ab4329129aa0048af863 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -506,6 +506,141 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 3), new Tuple2<String, Integer>("new york", 1))); + @Test + public void testPairMap() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "california"), + new Tuple2<Integer, String>(3, "california"), + new Tuple2<Integer, String>(4, "new york"), + new Tuple2<Integer, String>(1, "new york")), + Arrays.asList( + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(3, "new york"), + new Tuple2<Integer, String>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.map( + new PairFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { + return in.swap(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMapPartitions() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "california"), + new Tuple2<Integer, String>(3, "california"), + new Tuple2<Integer, String>(4, "new york"), + new Tuple2<Integer, String>(1, "new york")), + Arrays.asList( + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(3, "new york"), + new Tuple2<Integer, String>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions( + new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { + LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + while (in.hasNext()) { + Tuple2<String, Integer> next = in.next(); + out.add(next.swap()); + } + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMap2() { // Maps pair -> single + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1, 3, 4, 1), + Arrays.asList(5, 5, 3, 1)); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Integer> reversed = pairStream.map( + new Function<Tuple2<String, Integer>, Integer>() { + @Override + public Integer call(Tuple2<String, Integer> in) throws Exception { + return in._2(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair + List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2)), + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2))); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o")), + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { + List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -570,7 +705,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( new Function<Integer, Integer>() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -668,7 +803,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; @@ -680,7 +815,7 @@ public class JavaAPISuite implements Serializable { } return Optional.of(out); } - }); + }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);