Skip to content
Snippets Groups Projects
Commit 20cf7705 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Fix for flatmap

parent 314d87a0
No related branches found
No related tags found
No related merge requests found
......@@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
......
......@@ -562,6 +562,48 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
@Test
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("hi", 1),
new Tuple2<String, Integer>("ho", 2)),
Arrays.asList(
new Tuple2<String, Integer>("hi", 1),
new Tuple2<String, Integer>("ho", 2)));
List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<Integer, String>(1, "h"),
new Tuple2<Integer, String>(1, "i"),
new Tuple2<Integer, String>(2, "h"),
new Tuple2<Integer, String>(2, "o")),
Arrays.asList(
new Tuple2<Integer, String>(1, "h"),
new Tuple2<Integer, String>(1, "i"),
new Tuple2<Integer, String>(2, "h"),
new Tuple2<Integer, String>(2, "o")));
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
for (Character s: in._1().toCharArray()) {
out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
}
return out;
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment