diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d4f74d3e1854344af2186d063d80090760b4bf40..6cc608ea5bc690ab91a91fb482beddd817a95843 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -352,9 +352,8 @@ private[spark] class TaskSchedulerImpl(
       taskResultGetter.stop()
     }
 
-    // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
-    // TODO: Do something better !
-    Thread.sleep(5000L)
+    // sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
+    Thread.sleep(1000L)
   }
 
   override def defaultParallelism() = backend.defaultParallelism()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1ec4492bcab9b3c7cc4125dcc64f1d30672b1b68..a493a8279f94218318be45b7bdc1ee372209dc64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
     dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
   }
 
-
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream. However, the reduction is done incrementally
@@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   }
 
   /**
-   * Enable periodic checkpointing of RDDs of this DStream
+   * Enable periodic checkpointing of RDDs of this DStream.
    * @param interval Time interval after which generated RDD will be checkpointed
    */
   def checkpoint(interval: Duration) = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 9dfcc08abea95a28549dd8d5f8b9f1f27629d685..426f61e24afbd69a14fc7a03dfd5c2db4fdf3914 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -340,6 +340,10 @@ abstract class DStream[T: ClassTag] (
   private[streaming] def clearMetadata(time: Time) {
     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
     generatedRDDs --= oldRDDs.keys
+    if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
+      logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
+      oldRDDs.values.foreach(_.unpersist(false))
+    }
     logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
     dependencies.foreach(_.clearMetadata(time))
@@ -760,7 +764,12 @@ abstract class DStream[T: ClassTag] (
     this.foreachRDD(saveFunc)
   }
 
-  def register() {
+  /**
+   * Register this streaming as an output stream. This would ensure that RDDs of this
+   * DStream will be generated.
+   */
+  def register(): DStream[T] = {
     ssc.registerOutputStream(this)
+    this
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 2da4127f47f142d882ba6e19c10b6c40b5f84306..38bad5ac8042ade004d3b2d11a755ba670de80ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
             }
         }
       case None =>
-        logInfo("Nothing to delete")
+        logDebug("Nothing to delete")
     }
   }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234208a3bc387829287513dabc4a9f6c9..b73edf81d4273057af540e87b76898a8ef5769c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._
 
 import util.ManualClock
 import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
 
 class BasicOperationsSuite extends TestSuiteBase {
   test("map") {
@@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
     Thread.sleep(1000)
   }
 
-  test("forgetting of RDDs - map and window operations") {
-    assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+  val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
 
-    val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+  test("rdd cleanup - map and window") {
     val rememberDuration = Seconds(3)
-
-    assert(input.size === 10, "Number of inputs have changed")
-
     def operation(s: DStream[Int]): DStream[(Int, Int)] = {
       s.map(x => (x % 10, 1))
        .window(Seconds(2), Seconds(1))
        .window(Seconds(4), Seconds(2))
     }
 
-    val ssc = setupStreams(input, operation _)
-    ssc.remember(rememberDuration)
-    runStreams[(Int, Int)](ssc, input.size, input.size / 2)
-
-    val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
-    val windowedStream1 = windowedStream2.dependencies.head
+    val operatedStream = runCleanupTest(conf, operation _,
+      numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
+    val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+    val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
     val mappedStream = windowedStream1.dependencies.head
 
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    assert(clock.time === Seconds(10).milliseconds)
-
-    // IDEALLY
-    // WindowedStream2 should remember till 7 seconds: 10, 8,
-    // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
-    // MappedStream should remember till 7 seconds:    10, 9, 8, 7, 6, 5, 4, 3,
+    // Checkpoint remember durations
+    assert(windowedStream2.rememberDuration === rememberDuration)
+    assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+    assert(mappedStream.rememberDuration ===
+      rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
 
-    // IN THIS TEST
-    // WindowedStream2 should remember till 7 seconds: 10, 8,
+    // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
     // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-    // MappedStream should remember till 7 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
+    // MappedStream should remember till 2 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
 
     // WindowedStream2
     assert(windowedStream2.generatedRDDs.contains(Time(10000)))
@@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
     assert(mappedStream.generatedRDDs.contains(Time(2000)))
     assert(!mappedStream.generatedRDDs.contains(Time(1000)))
   }
+
+  test("rdd cleanup - updateStateByKey") {
+    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+      Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+    }
+    val stateStream = runCleanupTest(
+      conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
+
+    assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+    assert(stateStream.generatedRDDs.contains(Time(10000)))
+    assert(!stateStream.generatedRDDs.contains(Time(4000)))
+  }
+
+  /** Test cleanup of RDDs in DStream metadata */
+  def runCleanupTest[T: ClassTag](
+      conf2: SparkConf,
+      operation: DStream[Int] => DStream[T],
+      numExpectedOutput: Int = cleanupTestInput.size,
+      rememberDuration: Duration = null
+    ): DStream[T] = {
+
+    // Setup the stream computation
+    assert(batchDuration === Seconds(1),
+      "Batch duration has changed from 1 second, check cleanup tests")
+    val ssc = setupStreams(cleanupTestInput, operation)
+    val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
+    if (rememberDuration != null) ssc.remember(rememberDuration)
+    val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    assert(clock.time === Seconds(10).milliseconds)
+    assert(output.size === numExpectedOutput)
+    operatedStream
+  }
 }