diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 71b4e5bf1a80cdfb8e34075a123bfed1a24ca966..346151c147483974279fcaa2276e8c350674af71 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -147,7 +147,7 @@ object ActorWordCount {
      */
 
     val lines = ssc.actorStream[String](
-      Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+      Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
         host, port.toInt))), "SampleReceiver")
 
     //compute wordcount
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 30985b4ebc13b2db14a86a6f6e3b14049efccbfd..51efe6cae84942b0d4b246d16e68a488045eca87 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
 import spark.api.java.function.{Function => JFunction}
 import spark.api.java.JavaRDD
 import spark.storage.StorageLevel
+import spark.RDD
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -26,7 +27,9 @@ import spark.storage.StorageLevel
  *  - A function that is used to generate an RDD after each time interval
  */
 class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
-    extends JavaDStreamLike[T, JavaDStream[T]] {
+    extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+
+  override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
   def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 3768eac62f8195545efca207417e2ffea8dc13ac..548809a359644d21785bc4f8c96951a42f1c07f3 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
 import scala.collection.JavaConversions._
 
 import spark.streaming._
-import spark.api.java.JavaRDD
+import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
 import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import java.util
 import spark.RDD
 import JavaDStream._
 
-trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
+    extends Serializable {
   implicit val classManifest: ClassManifest[T]
 
   def dstream: DStream[T]
 
+  def wrapRDD(in: RDD[T]): R
+
   implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
     in.map(new JLong(_))
   }
@@ -212,35 +215,35 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
   /**
    * Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
    */
-  def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
-    new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
+  def slice(fromTime: Time, toTime: Time): JList[R] = {
+    new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq)
   }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * this DStream will be registered as an output stream and therefore materialized.
    */
-  def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
-    dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+  def foreach(foreachFunc: JFunction[R, Void]) {
+    dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
   }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * this DStream will be registered as an output stream and therefore materialized.
    */
-  def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
-    dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+  def foreach(foreachFunc: JFunction2[R, Time, Void]) {
+    dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
   }
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of this DStream.
    */
-  def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+  def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
     implicit val cm: ClassManifest[U] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
     def scalaTransform (in: RDD[T]): RDD[U] =
-      transformFunc.call(new JavaRDD[T](in)).rdd
+      transformFunc.call(wrapRDD(in)).rdd
     dstream.transform(scalaTransform(_))
   }
 
@@ -248,11 +251,41 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of this DStream.
    */
-  def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
+  def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
     implicit val cm: ClassManifest[U] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
     def scalaTransform (in: RDD[T], time: Time): RDD[U] =
-      transformFunc.call(new JavaRDD[T](in), time).rdd
+      transformFunc.call(wrapRDD(in), time).rdd
+    dstream.transform(scalaTransform(_, _))
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of this DStream.
+   */
+  def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+  JavaPairDStream[K2, V2] = {
+    implicit val cmk: ClassManifest[K2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+    implicit val cmv: ClassManifest[V2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+    def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
+      transformFunc.call(wrapRDD(in)).rdd
+    dstream.transform(scalaTransform(_))
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of this DStream.
+   */
+  def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+  JavaPairDStream[K2, V2] = {
+    implicit val cmk: ClassManifest[K2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+    implicit val cmv: ClassManifest[V2] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+    def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
+      transformFunc.call(wrapRDD(in), time).rdd
     dstream.transform(scalaTransform(_, _))
   }
 
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ae403691abf97fdfc8c653f5d960568c20ec051f..30240cad988be32e82c701a0dd7b535156c3dcb4 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -15,11 +15,14 @@ import org.apache.hadoop.conf.Configuration
 import spark.api.java.{JavaRDD, JavaPairRDD}
 import spark.storage.StorageLevel
 import com.google.common.base.Optional
+import spark.RDD
 
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifiest: ClassManifest[K],
     implicit val vManifest: ClassManifest[V])
-    extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+    extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+
+  override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 
   // =======================================================================
   // Methods common to all DStream's
@@ -73,36 +76,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
     dstream.union(that.dstream)
 
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
-   */
-  def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
-  JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassManifest[K2] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
-    implicit val cmv: ClassManifest[V2] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
-    def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
-      transformFunc.call(new JavaPairRDD[K, V](in)).rdd
-    dstream.transform(scalaTransform(_))
-  }
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of this DStream.
-   */
-  def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
-  JavaPairDStream[K2, V2] = {
-    implicit val cmk: ClassManifest[K2] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
-    implicit val cmv: ClassManifest[V2] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
-    def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
-      transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
-    dstream.transform(scalaTransform(_, _))
-  }
-
   // =======================================================================
   // Methods only for PairDStream's
   // =======================================================================
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index a15b64c5d8d1912e1090dfec71a2ce2d80a710b5..d2a0ba725fdbfd8d0e485b2b9c49342c2a07a129 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Registers an output stream that will be computed every interval
    */
-  def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+  def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
     ssc.registerOutputStream(outputStream.dstream)
   }
 
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 17cd5ed7952e59dac92f9bd111a64b8dcd069a62..4530af5f6af5037b035b27bf3b305fdc7767b5d1 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -13,6 +13,8 @@ import scala.Tuple2;
 import spark.HashPartitioner;
 import spark.api.java.JavaPairRDD;
 import spark.api.java.JavaRDD;
+import spark.api.java.JavaRDDLike;
+import spark.api.java.JavaPairRDD;
 import spark.api.java.JavaSparkContext;
 import spark.api.java.function.*;
 import spark.storage.StorageLevel;
@@ -294,8 +296,9 @@ public class JavaAPISuite implements Serializable {
         Arrays.asList(6,7,8),
         Arrays.asList(9,10,11));
 
-    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> transformed =
+        stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
       @Override
       public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
         return in.map(new Function<Integer, Integer>() {
@@ -921,6 +924,46 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void testPairToNormalRDDTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<Integer, Integer>(3, 5),
+            new Tuple2<Integer, Integer>(1, 5),
+            new Tuple2<Integer, Integer>(4, 5),
+            new Tuple2<Integer, Integer>(2, 5)),
+        Arrays.asList(
+            new Tuple2<Integer, Integer>(2, 5),
+            new Tuple2<Integer, Integer>(3, 5),
+            new Tuple2<Integer, Integer>(4, 5),
+            new Tuple2<Integer, Integer>(1, 5)));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(3,1,4,2),
+        Arrays.asList(2,3,4,1));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+    JavaDStream<Integer> firstParts = pairStream.transform(
+        new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
+          @Override
+          public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+            return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
+              @Override
+              public Integer call(Tuple2<Integer, Integer> in) {
+                return in._1();
+              }
+            });
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(firstParts);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
   public void testMapValues() {
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 52ea28732ada89490a28c1d8f2366df491fb501d..64a7e7cbf9a367285b9a3704f781235dac099bbb 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
    * Attach a provided stream to it's associated StreamingContext as a
    * [[spark.streaming.TestOutputStream]].
    **/
-  def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
-    dstream: JavaDStreamLike[T, This]) = {
+  def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
+      R <: spark.api.java.JavaRDDLike[T, R]](
+    dstream: JavaDStreamLike[T, This, R]) = {
     implicit val cm: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
     val ostream = new TestOutputStream(dstream.dstream,