Skip to content
Snippets Groups Projects
Commit 9d49a6b0 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Use RDD type for `foreach` operator in Java.

parent 8b9c673f
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream} ...@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction} import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD import spark.api.java.JavaRDD
import spark.storage.StorageLevel import spark.storage.StorageLevel
import spark.RDD
/** /**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
...@@ -26,7 +27,9 @@ import spark.storage.StorageLevel ...@@ -26,7 +27,9 @@ import spark.storage.StorageLevel
* - A function that is used to generate an RDD after each time interval * - A function that is used to generate an RDD after each time interval
*/ */
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] { extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
/** Return a new DStream containing only the elements that satisfy a predicate. */ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
......
...@@ -6,17 +6,20 @@ import java.lang.{Long => JLong} ...@@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import spark.streaming._ import spark.streaming._
import spark.api.java.JavaRDD import spark.api.java.{JavaRDDLike, JavaRDD}
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import java.util import java.util
import spark.RDD import spark.RDD
import JavaDStream._ import JavaDStream._
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
extends Serializable {
implicit val classManifest: ClassManifest[T] implicit val classManifest: ClassManifest[T]
def dstream: DStream[T] def dstream: DStream[T]
def wrapRDD(in: RDD[T]): R
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_)) in.map(new JLong(_))
} }
...@@ -220,16 +223,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable ...@@ -220,16 +223,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Apply a function to each RDD in this DStream. This is an output operator, so * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized. * this DStream will be registered as an output stream and therefore materialized.
*/ */
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { def foreach(foreachFunc: JFunction[R, Void]) {
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
} }
/** /**
* Apply a function to each RDD in this DStream. This is an output operator, so * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized. * this DStream will be registered as an output stream and therefore materialized.
*/ */
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { def foreach(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
} }
/** /**
......
...@@ -19,7 +19,9 @@ import com.google.common.base.Optional ...@@ -19,7 +19,9 @@ import com.google.common.base.Optional
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K], implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
// ======================================================================= // =======================================================================
// Methods common to all DStream's // Methods common to all DStream's
......
...@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/** /**
* Registers an output stream that will be computed every interval * Registers an output stream that will be computed every interval
*/ */
def registerOutputStream(outputStream: JavaDStreamLike[_, _]) { def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
ssc.registerOutputStream(outputStream.dstream) ssc.registerOutputStream(outputStream.dstream)
} }
......
...@@ -12,6 +12,7 @@ import org.junit.Test; ...@@ -12,6 +12,7 @@ import org.junit.Test;
import scala.Tuple2; import scala.Tuple2;
import spark.HashPartitioner; import spark.HashPartitioner;
import spark.api.java.JavaRDD; import spark.api.java.JavaRDD;
import spark.api.java.JavaRDDLike;
import spark.api.java.JavaSparkContext; import spark.api.java.JavaSparkContext;
import spark.api.java.function.*; import spark.api.java.function.*;
import spark.storage.StorageLevel; import spark.storage.StorageLevel;
......
...@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
* Attach a provided stream to it's associated StreamingContext as a * Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]]. * [[spark.streaming.TestOutputStream]].
**/ **/
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
dstream: JavaDStreamLike[T, This]) = { R <: spark.api.java.JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassManifest[T] = implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream, val ostream = new TestOutputStream(dstream.dstream,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment