diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 30985b4ebc13b2db14a86a6f6e3b14049efccbfd..51efe6cae84942b0d4b246d16e68a488045eca87 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
 import spark.api.java.function.{Function => JFunction}
 import spark.api.java.JavaRDD
 import spark.storage.StorageLevel
+import spark.RDD
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -26,7 +27,9 @@ import spark.storage.StorageLevel
  *  - A function that is used to generate an RDD after each time interval
  */
 class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
-    extends JavaDStreamLike[T, JavaDStream[T]] {
+    extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+
+  override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 1c1ba05ff98b5821e685e2ab5aa9504ed65e6378..4e1458ca9e36ef3641995e3643d831886720a187 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
 import scala.collection.JavaConversions._
 
 import spark.streaming._
-import spark.api.java.JavaRDD
+import spark.api.java.{JavaRDDLike, JavaRDD}
 import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import java.util
 import spark.RDD
 import JavaDStream._
 
-trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
+    extends Serializable {
   implicit val classManifest: ClassManifest[T]
 
   def dstream: DStream[T]
 
+  def wrapRDD(in: RDD[T]): R
+
   implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
     in.map(new JLong(_))
   }
@@ -220,16 +223,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * this DStream will be registered as an output stream and therefore materialized.
    */
-  def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
-    dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+  def foreach(foreachFunc: JFunction[R, Void]) {
+    dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
   }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * this DStream will be registered as an output stream and therefore materialized.
    */
-  def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
-    dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+  def foreach(foreachFunc: JFunction2[R, Time, Void]) {
+    dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
   }
 
   /**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 952ca657bf77be3ddb79fa0b3a0d3c5bf39e891a..de3e8023006281ed5d69a71eb5519e2e8e488477 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -19,7 +19,9 @@ import com.google.common.base.Optional
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifiest: ClassManifest[K],
     implicit val vManifest: ClassManifest[V])
-    extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+    extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+
+  override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 
   // =======================================================================
   // Methods common to all DStream's
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d9a676819aa6a96d0385480c80216a5d964b187e..878e179589c0bba1710ebb91394071e1e3087819 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Registers an output stream that will be computed every interval
    */
-  def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+  def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
     ssc.registerOutputStream(outputStream.dstream)
   }
 
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 5d510fd89f141b388a0cb3c03d426cd1510cb2fb..4fe2de5a1a764b8288a159565d239c2407c33677 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -12,6 +12,7 @@ import org.junit.Test;
 import scala.Tuple2;
 import spark.HashPartitioner;
 import spark.api.java.JavaRDD;
+import spark.api.java.JavaRDDLike;
 import spark.api.java.JavaSparkContext;
 import spark.api.java.function.*;
 import spark.storage.StorageLevel;
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 52ea28732ada89490a28c1d8f2366df491fb501d..64a7e7cbf9a367285b9a3704f781235dac099bbb 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
    * Attach a provided stream to it's associated StreamingContext as a
    * [[spark.streaming.TestOutputStream]].
    **/
-  def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
-    dstream: JavaDStreamLike[T, This]) = {
+  def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
+      R <: spark.api.java.JavaRDDLike[T, R]](
+    dstream: JavaDStreamLike[T, This, R]) = {
     implicit val cm: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
     val ostream = new TestOutputStream(dstream.dstream,