Skip to content
Snippets Groups Projects
Commit bacfe5eb authored by Tathagata Das's avatar Tathagata Das
Browse files

Added JavaStreamingContext.transform

parent 9fccb17a
No related branches found
No related tags found
No related merge requests found
......@@ -598,4 +598,15 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
new JavaPairRDD[K, V](rdd.rdd)
}
}
......@@ -474,10 +474,10 @@ class StreamingContext private (
* the DStreams.
*/
def transform[T: ClassManifest](
streams: Seq[DStream[_]],
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
new TransformedDStream[T](streams, sparkContext.clean(transformFunc))
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
/**
......
......@@ -711,6 +711,11 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
/** Convert to a JavaDStream */
def toJavaDStream(): JavaDStream[(K, V)] = {
new JavaDStream[(K, V)](dstream)
}
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}
......
......@@ -33,7 +33,7 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
......@@ -616,6 +616,54 @@ class JavaStreamingContext(val ssc: StreamingContext) {
new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
}
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams. The order of the JavaRDDs in the transform function parameter will be the
* same as the order of corresponding DStreams in the list. Note that for adding a
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
*/
def transform[T](
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]]
): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
transformFunc.call(jrdds, time).rdd
}
ssc.transform(scalaDStreams, scalaTransformFunc)
}
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams. The order of the JavaRDDs in the transform function parameter will be the
* same as the order of corresponding DStreams in the list. Note that for adding a
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
*/
def transform[K, V](
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
transformFunc.call(jrdds, time).rdd
}
ssc.transform(scalaDStreams, scalaTransformFunc)
}
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
......
......@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
......@@ -292,8 +293,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6));
JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
......@@ -331,7 +332,6 @@ public class JavaAPISuite implements Serializable {
}
});
}
}
);
JavaTestUtils.attachTestOutputStream(transformed);
......@@ -354,7 +354,8 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Integer> transformed1 = stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
@Override
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
return null;
}
}
......@@ -421,51 +422,56 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("new york", "rangers")));
Arrays.asList(
new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(
new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "islanders")));
Arrays.asList(
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(
new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("yankees", "mets"))),
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("rangers", "islanders"))));
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("yankees", "mets"))),
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream2, 1);
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
pairStream2,
new Function3 <
new Function3<
JavaPairRDD<String, String>,
JavaPairRDD<String, String>,
Time,
JavaPairRDD<String, Tuple2<String, String>>
>() {
@Override public JavaPairRDD<String, Tuple2<String, String>> call(
@Override
public JavaPairRDD<String, Tuple2<String, String>> call(
JavaPairRDD<String, String> rdd1,
JavaPairRDD<String, String> rdd2,
Time time
) throws Exception {
) throws Exception {
return rdd1.join(rdd2);
}
}
......@@ -475,9 +481,9 @@ public class JavaAPISuite implements Serializable {
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
......@@ -566,7 +572,6 @@ public class JavaAPISuite implements Serializable {
}
);
JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
......@@ -578,7 +583,74 @@ public class JavaAPISuite implements Serializable {
);
}
@Test
@Test
public void testStreamingContextTransform(){
List<List<Integer>> stream1input = Arrays.asList(
Arrays.asList(1),
Arrays.asList(2)
);
List<List<Integer>> stream2input = Arrays.asList(
Arrays.asList(3),
Arrays.asList(4)
);
List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
Arrays.asList(new Tuple2<Integer, String>(1, "x")),
Arrays.asList(new Tuple2<Integer, String>(2, "y"))
);
List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
);
JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
// This is just to test whether this transform to JavaStream compiles
JavaDStream<Long> transformed1 = ssc.transform(
listOfDStreams1,
new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
assert(listOfRDDs.size() == 2);
return null;
}
}
);
List<JavaDStream<?>> listOfDStreams2 =
Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
assert(listOfRDDs.size() == 3);
JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i);
}
};
return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
}
}
);
JavaTestUtils.attachTestOutputStream(transformed2);
List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment