Skip to content
Snippets Groups Projects
Commit 3bcc6e5c authored by Tathagata Das's avatar Tathagata Das
Browse files

Merge pull request #466 from pwendell/java-stream-transform

STREAMING-50: Support transform workaround in JavaPairDStream
parents e8663e0f 3f3e77f2
No related branches found
No related tags found
No related merge requests found
......@@ -8,11 +8,11 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import spark.Partitioner
import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.JavaPairRDD
import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
......@@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
transformFunc.call(new JavaPairRDD[K, V](in)).rdd
dstream.transform(scalaTransform(_))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
dstream.transform(scalaTransform(_, _))
}
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
......
......@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
......@@ -872,6 +873,50 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
@Test
public void testPairTransform() {
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
Arrays.asList(
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(4, 5),
new Tuple2<Integer, Integer>(2, 5)),
Arrays.asList(
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5),
new Tuple2<Integer, Integer>(1, 5)));
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5)),
Arrays.asList(
new Tuple2<Integer, Integer>(1, 5),
new Tuple2<Integer, Integer>(2, 5),
new Tuple2<Integer, Integer>(3, 5),
new Tuple2<Integer, Integer>(4, 5)));
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
return in.sortByKey();
}
});
JavaTestUtils.attachTestOutputStream(sorted);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment