Skip to content
Snippets Groups Projects
Commit 3f3e77f2 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

STREAMING-50: Support transform workaround in JavaPairDStream

This ports a useful workaround (the `transform` function) to
JavaPairDStream. It is necessary to do things like sorting which
are not supported yet in the core streaming API.
parent fd7e414b
No related branches found
No related tags found
No related merge requests found
......@@ -8,11 +8,11 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import spark.Partitioner
import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.JavaPairRDD
import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
......@@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
transformFunc.call(new JavaPairRDD[K, V](in)).rdd
dstream.transform(scalaTransform(_))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
dstream.transform(scalaTransform(_, _))
}
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
......
......@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
......@@ -872,6 +873,50 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
@Test
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(4, 5),
new Tuple2<Integer, Integer>(2, 5)),
Arrays.asList(
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5),
new Tuple2<Integer, Integer>(1, 5)));
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5)),
Arrays.asList(
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5)));
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
return in.sortByKey();
}
});
JavaTestUtils.attachTestOutputStream(sorted);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment