diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 3273817c78562f03ec6629d5c229159968db34b0..4e8a680a75d07cc8ff54dfe9cc0bb2f9feb361af 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -167,7 +167,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
 </tr>
 </table>
 
-A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).
+A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.dstream.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
 
 ## Output Operations
 When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9418ccb87e759aeaaa52ba7b5a9d17aeee..8c5d0bd56845bc8d2262af4f339c0e85a0660b6e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -82,7 +82,7 @@ object RecoverableNetworkWordCount {
     val lines = ssc.socketTextStream(ip, port)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+    wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
       val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
       println(counts)
       println("Appending to " + outputFile.getAbsolutePath)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 834b775d4fd2b09ea1c2a24dce48af963c57871b..d53b66dd4677141518ca671d77957039ad06543c 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.streaming.flume
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 object FlumeUtils {
   /**
@@ -42,6 +43,7 @@ object FlumeUtils {
 
   /**
    * Creates a input stream from a Flume source.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    */
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index c2d851f94311d48fe7ce19ca6a1d14c49d5c48e4..37c03be4e77ade881b58e4a76b3f0de7ac682cd8 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -26,8 +26,9 @@ import java.util.{Map => JMap}
 import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 
 object KafkaUtils {
@@ -77,6 +78,7 @@ object KafkaUtils {
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
    * @param groupId   The group id for this consumer
@@ -127,7 +129,7 @@ object KafkaUtils {
    *                    see http://kafka.apache.org/08/configuration.html
    * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    *                in its own thread
-   * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+   * @param storageLevel RDD storage level.
    */
   def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
       jssc: JavaStreamingContext,
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 0e6c25dbee8fbe5e63f0648c890143e9a4ae4509..3636e46bb82576013e6362c1faa84c974959f12e 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.streaming.mqtt
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
 import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
 
 object MQTTUtils {
   /**
@@ -43,6 +44,7 @@ object MQTTUtils {
 
   /**
    * Create an input stream that receives messages pushed by a MQTT publisher.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param brokerUrl Url of remote MQTT publisher
    * @param topic     Topic name to subscribe to
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 5e506ffabcfc4a796bc0cf1a10247ad6023d7757..b8bae7b6d33855fa20f8aeee2391260026044a93 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming.twitter
 import twitter4j.Status
 import twitter4j.auth.Authorization
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.DStream
 
 object TwitterUtils {
   /**
@@ -50,6 +51,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc   JavaStreamingContext object
    */
   def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
@@ -61,6 +63,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc    JavaStreamingContext object
    * @param filters Set of filter strings to get only those tweets that match them
    */
@@ -87,6 +90,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    */
@@ -96,6 +100,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    * @param filters     Set of filter strings to get only those tweets that match them
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 546d9df3b5df9b0a8232a74356dba607151c9ad8..7a14b3d2bf27859c346f4490cac61acd83e68a69 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -25,8 +25,9 @@ import akka.zeromq.Subscribe
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 object ZeroMQUtils {
   /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 668e5324e6b64c8a279adefb67ae30d5b8263a7f..8faa79f8c7e9d8b48bff46497cfb0a2550e15ee5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream}
+import scala.collection.mutable.ArrayBuffer
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import collection.mutable.ArrayBuffer
 import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
 
@@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def remember(duration: Duration) {
     this.synchronized {
       if (rememberDuration != null) {
-        throw new Exception("Batch duration already set as " + batchDuration +
+        throw new Exception("Remember duration already set as " + batchDuration +
           ". cannot set it again.")
       }
       rememberDuration = duration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee83ae902be11adff1aef4999e85f29877415615..7b279334034ab9adfa5ab67aa61df75e27a1ac2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Set the context to periodically checkpoint the DStream operations for master
+   * Set the context to periodically checkpoint the DStream operations for driver
    * fault-tolerance.
    * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
    *                  Note that this must be a fault-tolerant file system like HDFS for
@@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
   def actorStream[T: ClassTag](
       props: Props,
       name: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
       supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
     ): DStream[T] = {
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
+   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    * @tparam T            Type of the objects in the received blocks
    */
   def rawSocketStream[T: ClassTag](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d29033df3223f31f6c1ee807ad996ae5773c77c2..c92854ccd9a28030dae44d72bfc7b0376c14d60c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.streaming.api.java
 
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.rdd.RDD
 
 import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index cea4795eb527be76eeb6251717f6335d45a28f43..1ec4492bcab9b3c7cc4125dcc64f1d30672b1b68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
 import java.util
 import org.apache.spark.rdd.RDD
 import JavaDStream._
+import org.apache.spark.streaming.dstream.DStream
 
 trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
     extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 6c3467d4056d721a90e0efc55e903862af12526e..6bb985ca540fff6143e645c5f142498d7a43c492 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel
 import com.google.common.base.Optional
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.streaming.dstream.DStream
 
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifest: ClassTag[K],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b4c46f5e506ac1403efc0efe93eb30988cd1b40d..a2f0b88cb094f7c8fc8e47c3210d54c7ea3354b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.scheduler.StreamingListener
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.dstream.DStream
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -150,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    */
   def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
   : JavaDStream[String] = {
@@ -160,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Create a input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
-   * lines.
+   * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    */
@@ -301,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Create an input stream with any arbitrary user implemented actor receiver.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor
    *
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
similarity index 99%
rename from streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index f760093579ebff31c22fd8a62f1f4d33997d6674..a7c4cca7eacab61ade04b4636d89c765b89890b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
 
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
 import scala.deprecated
 import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
 
-import StreamingContext._
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.Duration
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -42,7 +43,7 @@ import org.apache.spark.util.MetadataCleaner
  * by a parent DStream.
  *
  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
  * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
  * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
  * implicit conversions when `spark.streaming.StreamingContext._` is imported.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
similarity index 97%
rename from streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 671f7bbce7433565f47dbec8804e2990b0dee0a1..2da4127f47f142d882ba6e19c10b6c40b5f84306 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
 
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
-
+import java.io.{ObjectInputStream, IOException}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.FileSystem
-
 import org.apache.spark.Logging
-
-import java.io.{ObjectInputStream, IOException}
+import org.apache.spark.streaming.Time
 
 private[streaming]
 class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index f10d48363474a8b952650ef7a71f465f900d8aa3..37c46b26a50b54bdd1b213b2fb627aa0ea895709 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
+import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.util.TimeStampedHashMap
 
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index db2e0a4ceef0366ca0deefd5650df1ed0f336d94..c81534ae584ea05e1fc14800544dba1ffc326668 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index 244dc3ee4fa143c8bde0bc08045c67545dc6929c..658623455498ceb038915e791ac04c502f2f1909 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 336c4b7a92dc6c3754eb16436c34fa6ba4d2ea18..c7bb2833eabb8878cecc6cbc9beb50b4b6ad4227 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 364abcde68c95125d887a6ed0b40ad52611b63eb..905bc723f69a9e27dcccedbe6399240c32f5168e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 23136f44fa3103d76bfe13a6a4d9ba21706db9c1..a9bb51f05404833487d7e9f8579df09345b0884e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 8f84232cab3485f4c8d78200312f034214dcb9d8..a1075ad304ef6c320cb2a05c04355446de516c5a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
 
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 8a04060e5b6c11360fbcec5d02777aee7cf0753f..3d8ee29df1e821fce5e48a152643071714a25d7c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 0ce364fd4632829d3b7f80945e27633626d5e346..7aea1f945d9db60d059aa09b7f94474854e2e5d6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index c0b7491d096cd64bc37d7b2d5ce97ba00feded48..02704a8d1c2e0757d4ca1ff6cdc8838ded58e9b3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
similarity index 99%
rename from streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
rename to streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 69d80c37112375de3d63d7d80b027a7339b0dff0..6b3e48382e0c403aaad5ecd25d1ce20baae7db37 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream._
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.{Time, Duration}
 
 class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index db56345ca84fb3749cdce96b3968d103cbbcc9f0..7a6b1ea35eb13163e15f0a77ec19f67a56bc0b9a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel
 
 import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming.{Duration, Interval, Time}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 84e69f277b22e97d2ac8303bc5d784faa8b96b06..880a89bc368956ce575f9a5489a5272cb6da6784 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream
 import org.apache.spark.Partitioner
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import scala.reflect.ClassTag
 
 private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index b34ba7b9b436fcd2f7361e0664c084d22f15f7d5..9d8889b6553566c1249400d0dae09ce1e3146e00 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.Partitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
 
 import scala.reflect.ClassTag
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index aeea060df7161fe33206a40e488a780bb1dedd9d..7cd4554282ca18a725eec959703948bb7b496436 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import scala.reflect.ClassTag
 
 private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 0d84ec84f2c6335e98316a3263fbeae2d520d77d..4ecba03ab5d2f84691d75b26c3caffdb4e362b65 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
-import collection.mutable.ArrayBuffer
 import org.apache.spark.rdd.UnionRDD
 
 import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 89c43ff935bb415183abe74e943ed953e4531406..6301772468737afe993c30cd3a84989ce57e7258 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
   extends DStream[T](parent.ssc) {
 
   if (!_windowDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
   if (!_slideDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
+  // Persist parent level by default, as those RDDs are going to be obviously reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
 
   override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
+  override def persist(level: StorageLevel): DStream[T] = {
+    // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
+    // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
+    // Instead control the persistence of the parent DStream.
+    parent.persist(level)
+    this
+  }
+
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 592e84791bf131252e07c30e68b4709db7339891..be67af3a6466a6a65a875576a3fbfe93b820d7a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.util
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ForEachDStream
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
 import StreamingContext._
 
 import scala.util.Random
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9406e0e20a403e83cf7689e63b8faa3421556104..7037aae234208a3bc387829287513dabc4a9f6c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
 
 import util.ManualClock
 import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.streaming.dstream.DStream
 
 class BasicOperationsSuite extends TestSuiteBase {
   test("map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 67ce5bc566b1540124a985095c9a2345f6eba701..0c68c44ddb6da9960b3869b941b9de650134fbc0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.util.Utils
 import org.apache.spark.SparkConf
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a477d200c91e37ce97ce0e6fe731df2f6899e9b9..f7f3346f81db58607903a92527816f55ae48a8fe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 import org.apache.spark.{SparkException, SparkConf, SparkContext}
 import org.apache.spark.util.{Utils, MetadataCleaner}
+import org.apache.spark.streaming.dstream.DStream
 
 class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
 
@@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
     inputStream.map(x => { throw new TestException("error in map task"); x})
-               .foreach(_.count)
+               .foreachRDD(_.count)
 
     val exception = intercept[Exception] {
       ssc.start()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index fa6414209605405e2a70834409bb3851e10b6422..9e0f2c900e8ba699c802d202839b0edeb2525ec4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming
 import org.apache.spark.streaming.scheduler._
 import scala.collection.mutable.ArrayBuffer
 import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.streaming.dstream.DStream
 
 class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9b2bb57e7798b56710f0b329c167c087dde558be..535e5bd1f1f2e7f8786c815a2f933274e88de2f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
 import org.apache.spark.streaming.util.ManualClock
 
 import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index c39abfc21b3ba219a08c6bf6e514db052dbcd7f0..471c99fab4682513c6216e5e406e093597e766bc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
 
 class WindowOperationsSuite extends TestSuiteBase {
 
@@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
     Seconds(3)
   )
 
+  test("window - persistence level") {
+    val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+    val ssc = new StreamingContext(conf, batchDuration)
+    val inputStream = new TestInputStream[Int](ssc, input, 1)
+    val windowStream1 = inputStream.window(batchDuration * 2)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+    windowStream1.persist(StorageLevel.MEMORY_ONLY)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+    ssc.stop()
+  }
+
   // Testing naive reduceByKeyAndWindow (without invertible function)
 
   testReduceByKeyAndWindow(
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index f670f65bf5b38ca6c56703cc955c2f2b9236a751..4886cd6ea8a64a1269dfd1f6fdb229d074b6f583 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark._
 import org.apache.spark.api.java._
 import org.apache.spark.rdd.{RDD, DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions}
-import org.apache.spark.streaming.{PairDStreamFunctions, DStream, StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
 
 
 private[spark] abstract class SparkType(val name: String)
@@ -147,7 +148,7 @@ object JavaAPICompletenessChecker {
               } else {
                 ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
               }
-            case "org.apache.spark.streaming.DStream" =>
+            case "org.apache.spark.streaming.dstream.DStream" =>
               if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
                 val tupleParams =
                   parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
@@ -248,30 +249,29 @@ object JavaAPICompletenessChecker {
       "org.apache.spark.SparkContext.getSparkHome",
       "org.apache.spark.SparkContext.executorMemoryRequested",
       "org.apache.spark.SparkContext.getExecutorStorageStatus",
-      "org.apache.spark.streaming.DStream.generatedRDDs",
-      "org.apache.spark.streaming.DStream.zeroTime",
-      "org.apache.spark.streaming.DStream.rememberDuration",
-      "org.apache.spark.streaming.DStream.storageLevel",
-      "org.apache.spark.streaming.DStream.mustCheckpoint",
-      "org.apache.spark.streaming.DStream.checkpointDuration",
-      "org.apache.spark.streaming.DStream.checkpointData",
-      "org.apache.spark.streaming.DStream.graph",
-      "org.apache.spark.streaming.DStream.isInitialized",
-      "org.apache.spark.streaming.DStream.parentRememberDuration",
-      "org.apache.spark.streaming.DStream.initialize",
-      "org.apache.spark.streaming.DStream.validate",
-      "org.apache.spark.streaming.DStream.setContext",
-      "org.apache.spark.streaming.DStream.setGraph",
-      "org.apache.spark.streaming.DStream.remember",
-      "org.apache.spark.streaming.DStream.getOrCompute",
-      "org.apache.spark.streaming.DStream.generateJob",
-      "org.apache.spark.streaming.DStream.clearOldMetadata",
-      "org.apache.spark.streaming.DStream.addMetadata",
-      "org.apache.spark.streaming.DStream.updateCheckpointData",
-      "org.apache.spark.streaming.DStream.restoreCheckpointData",
-      "org.apache.spark.streaming.DStream.isTimeValid",
+      "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
+      "org.apache.spark.streaming.dstream.DStream.zeroTime",
+      "org.apache.spark.streaming.dstream.DStream.rememberDuration",
+      "org.apache.spark.streaming.dstream.DStream.storageLevel",
+      "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
+      "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
+      "org.apache.spark.streaming.dstream.DStream.checkpointData",
+      "org.apache.spark.streaming.dstream.DStream.graph",
+      "org.apache.spark.streaming.dstream.DStream.isInitialized",
+      "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
+      "org.apache.spark.streaming.dstream.DStream.initialize",
+      "org.apache.spark.streaming.dstream.DStream.validate",
+      "org.apache.spark.streaming.dstream.DStream.setContext",
+      "org.apache.spark.streaming.dstream.DStream.setGraph",
+      "org.apache.spark.streaming.dstream.DStream.remember",
+      "org.apache.spark.streaming.dstream.DStream.getOrCompute",
+      "org.apache.spark.streaming.dstream.DStream.generateJob",
+      "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
+      "org.apache.spark.streaming.dstream.DStream.addMetadata",
+      "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
+      "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
+      "org.apache.spark.streaming.dstream.DStream.isTimeValid",
       "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
-      "org.apache.spark.streaming.StreamingContext.networkInputTracker",
       "org.apache.spark.streaming.StreamingContext.checkpointDir",
       "org.apache.spark.streaming.StreamingContext.checkpointDuration",
       "org.apache.spark.streaming.StreamingContext.receiverJobThread",