diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0c1b667c0ab42fa0393dc83d0c688a47fe8bddb9..6abec9e6bec0b279a9363d8de57704a1742d10ea 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
       windowDuration: Duration,
       slideDuration: Duration
     ): DStream[T] = {
-    this.window(windowDuration, slideDuration).reduce(reduceFunc)
+    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
   }
 
   def reduceByWindow(
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf06112676d78c1ebc04a982a09145c379afdd..021ff83b3655f17f5f4423dc48b1ebde45c4ead9 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -137,7 +137,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param numPartitions  Number of partitions of each RDD in the new DStream.
+   * @param numPartitions  number of partitions of each RDD in the new DStream; if not specified
+   *                       then Spark's default number of partitions will be used
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -155,7 +156,7 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -213,7 +214,7 @@ extends Serializable {
    * @param numPartitions  Number of partitions of each RDD in the new DStream.
    */
   def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V, 
+      reduceFunc: (V, V) => V,
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
@@ -230,7 +231,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of each RDD
+   *                       in the new DStream.
    */
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
@@ -245,7 +247,7 @@ extends Serializable {
   }
 
   /**
-   * Create a new DStream by reducing over a using incremental computation.
+   * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
    * The reduced value of over a new window is calculated using the old window's reduce value :
    *  1. reduce the new values that entered the window (e.g., adding new counts)
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -253,81 +255,64 @@ extends Serializable {
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    * @param reduceFunc associative reduce function
-   * @param invReduceFunc inverse function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      invReduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration
-    ): DStream[(K, V)] = {
-
-    reduceByKeyAndWindow(
-      reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
-  }
-
-  /**
-   * Create a new DStream by reducing over a using incremental computation.
-   * The reduced value of over a new window is calculated using the old window's reduce value :
-   *  1. reduce the new values that entered the window (e.g., adding new counts)
-   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
-   * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
-   * However, it is applicable to only "invertible reduce functions".
-   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
-   * @param invReduceFunc inverse function
+   * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param numPartitions  Number of partitions of each RDD in the new DStream.
+   * @param filterFunc     Optional function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
    */
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       invReduceFunc: (V, V) => V,
       windowDuration: Duration,
-      slideDuration: Duration,
-      numPartitions: Int
+      slideDuration: Duration = self.slideDuration,
+      numPartitions: Int = ssc.sc.defaultParallelism,
+      filterFunc: ((K, V)) => Boolean = null
     ): DStream[(K, V)] = {
 
     reduceByKeyAndWindow(
-      reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+      reduceFunc, invReduceFunc, windowDuration,
+      slideDuration, defaultPartitioner(numPartitions), filterFunc
+    )
   }
 
   /**
-   * Create a new DStream by reducing over a using incremental computation.
+   * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
    * The reduced value of over a new window is calculated using the old window's reduce value :
    *  1. reduce the new values that entered the window (e.g., adding new counts)
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
-   * @param invReduceFunc inverse function
+   * @param reduceFunc     associative reduce function
+   * @param invReduceFunc  inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param filterFunc     Optional function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
    */
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       invReduceFunc: (V, V) => V,
       windowDuration: Duration,
       slideDuration: Duration,
-      partitioner: Partitioner
+      partitioner: Partitioner,
+      filterFunc: ((K, V)) => Boolean
     ): DStream[(K, V)] = {
 
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
     new ReducedWindowedDStream[K, V](
-      self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
+      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+      windowDuration, slideDuration, partitioner
+    )
   }
 
   /**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091caaae66b352c0a8d24b86fe14240e215..4d3e0d03040e1bcfe0cb3a838393bee7d28b3f80 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -328,7 +328,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Create a new DStream by reducing over a using incremental computation.
+   * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
    * The reduced value of over a new window is calculated using the old window's reduce value :
    *  1. reduce the new values that entered the window (e.g., adding new counts)
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -342,25 +342,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param numPartitions  Number of partitions of each RDD in the new DStream.
+   * @param numPartitions  number of partitions of each RDD in the new DStream.
+   * @param filterFunc     function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
+   *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
       reduceFunc: Function2[V, V, V],
       invReduceFunc: Function2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
-      numPartitions: Int
+      numPartitions: Int,
+      filterFunc: JFunction[(K, V), java.lang.Boolean]
     ): JavaPairDStream[K, V] = {
     dstream.reduceByKeyAndWindow(
         reduceFunc,
         invReduceFunc,
         windowDuration,
         slideDuration,
-        numPartitions)
+        numPartitions,
+        (p: (K, V)) => filterFunc(p).booleanValue()
+    )
   }
 
   /**
-   * Create a new DStream by reducing over a using incremental computation.
+   * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
    * The reduced value of over a new window is calculated using the old window's reduce value :
    *  1. reduce the new values that entered the window (e.g., adding new counts)
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
@@ -374,20 +380,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param filterFunc     function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
+   *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
       reduceFunc: Function2[V, V, V],
       invReduceFunc: Function2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
-      partitioner: Partitioner
-    ): JavaPairDStream[K, V] = {
+      partitioner: Partitioner,
+      filterFunc: JFunction[(K, V), java.lang.Boolean]
+  ): JavaPairDStream[K, V] = {
     dstream.reduceByKeyAndWindow(
         reduceFunc,
         invReduceFunc,
         windowDuration,
         slideDuration,
-        partitioner)
+        partitioner,
+        (p: (K, V)) => filterFunc(p).booleanValue()
+    )
   }
 
   /**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25271cbd741083dd392590bcb176298b..aa5a71e1ed7005f1c0c74d4020e975c61710abd2 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -3,7 +3,7 @@ package spark.streaming.dstream
 import spark.streaming.StreamingContext._
 
 import spark.RDD
-import spark.rdd.CoGroupedRDD
+import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
 import spark.Partitioner
 import spark.SparkContext._
 import spark.storage.StorageLevel
@@ -15,7 +15,8 @@ private[streaming]
 class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     parent: DStream[(K, V)],
     reduceFunc: (V, V) => V,
-    invReduceFunc: (V, V) => V, 
+    invReduceFunc: (V, V) => V,
+    filterFunc: Option[((K, V)) => Boolean],
     _windowDuration: Duration,
     _slideDuration: Duration,
     partitioner: Partitioner
@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     //
 
     // Get the RDDs of the reduced values in "old time steps"
-    val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
+    val oldRDDs =
+      reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
     logDebug("# old RDDs = " + oldRDDs.size)
 
     // Get the RDDs of the reduced values in "new time steps"
-    val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
+    val newRDDs =
+      reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
     logDebug("# new RDDs = " + newRDDs.size)
 
     // Get the RDD of the reduced value of the previous window
-    val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+    val previousWindowRDD =
+      getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
 
     // Make the list of RDDs that needs to cogrouped together for reducing their reduced values
     val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
 
     // Cogroup the reduced RDDs and merge the reduced values
-    val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
-    //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+    val cogroupedRDD =
+      new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
 
     val numOldValues = oldRDDs.size
     val numNewValues = newRDDs.size
@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
       // Getting reduced values "old time steps" that will be removed from current window
       val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
       // Getting reduced values "new time steps"
-      val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+      val newValues =
+        (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+
       if (seqOfValues(0).isEmpty) {
         // If previous window's reduce value does not exist, then at least new values should exist
         if (newValues.isEmpty) {
@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
 
     val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
 
-    Some(mergedValuesRDD)
+    if (filterFunc.isDefined) {
+      Some(mergedValuesRDD.filter(filterFunc.get))
+    } else {
+      Some(mergedValuesRDD)
+    }
   }
-
-
 }
 
 
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 3ffe4b64d08735904e32c9605f8133043e6260ff..83d8591a3a3dc7107dffd33c706548a5a647089f 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -291,7 +291,6 @@ class TestOutputStream[T: ClassManifest](
     (rdd: RDD[T], t: Time) => {
       val collected = rdd.collect()
       output += collected
-      println(t + ": " + collected.mkString("[", ",", "]"))
     }
   ) {
 
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 5652596e1e4b9abc345c58b703f56f43181579a8..f0638e0e027f21576211e45b451c1dae31446b2f 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,7 +1,7 @@
 # Set everything to be logged to the file streaming/target/unit-tests.log 
 log4j.rootCategory=WARN, file
 # log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=false
 log4j.appender.file.file=streaming/target/unit-tests.log
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 10807901475e4fa34667d2c4b92c6a94b64be3b3..e6ac7b35aa9a08cf16d0cf9ef27ce6273fb2810c 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -84,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
   )
 
   /*
-  The output of the reduceByKeyAndWindow with inverse reduce function is
-  different from the naive reduceByKeyAndWindow. Even if the count of a
-  particular key is 0, the key does not get eliminated from the RDDs of
-  ReducedWindowedDStream. This causes the number of keys in these RDDs to
-  increase forever. A more generalized version that allows elimination of
-  keys should be considered.
+  The output of the reduceByKeyAndWindow with inverse function but without a filter
+  function will be different from the naive reduceByKeyAndWindow, as no keys get
+  eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
   */
 
   val bigReduceInvOutput = Seq(
@@ -177,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
 
   // Testing reduceByKeyAndWindow (with invertible reduce function)
 
-  testReduceByKeyAndWindowInv(
+  testReduceByKeyAndWindowWithInverse(
     "basic reduction",
     Seq(Seq(("a", 1), ("a", 3)) ),
     Seq(Seq(("a", 4)) )
   )
 
-  testReduceByKeyAndWindowInv(
+  testReduceByKeyAndWindowWithInverse(
     "key already in window and new value added into window",
     Seq( Seq(("a", 1)), Seq(("a", 1)) ),
     Seq( Seq(("a", 1)), Seq(("a", 2)) )
   )
 
-  testReduceByKeyAndWindowInv(
+  testReduceByKeyAndWindowWithInverse(
     "new key added into window",
     Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
     Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
   )
 
-  testReduceByKeyAndWindowInv(
+  testReduceByKeyAndWindowWithInverse(
     "key removed from window",
     Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
     Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
   )
 
-  testReduceByKeyAndWindowInv(
+  testReduceByKeyAndWindowWithInverse(
     "larger slide time",
     largerSlideInput,
     largerSlideReduceOutput,
@@ -209,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
     Seconds(2)
   )
 
-  testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+  testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
+
+  testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
 
   test("groupByKeyAndWindow") {
     val input = bigInput
@@ -276,27 +275,45 @@ class WindowOperationsSuite extends TestSuiteBase {
     test("reduceByKeyAndWindow - " + name) {
       val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
       val operation = (s: DStream[(String, Int)]) => {
-        s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
+        s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
       }
       testOperation(input, operation, expectedOutput, numBatches, true)
     }
   }
 
-  def testReduceByKeyAndWindowInv(
+  def testReduceByKeyAndWindowWithInverse(
     name: String,
     input: Seq[Seq[(String, Int)]],
     expectedOutput: Seq[Seq[(String, Int)]],
     windowDuration: Duration = Seconds(2),
     slideDuration: Duration = Seconds(1)
   ) {
-    test("reduceByKeyAndWindowInv - " + name) {
+    test("ReduceByKeyAndWindow with inverse function - " + name) {
       val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
       val operation = (s: DStream[(String, Int)]) => {
         s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
-         .persist()
          .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
       }
       testOperation(input, operation, expectedOutput, numBatches, true)
     }
   }
+
+  def testReduceByKeyAndWindowWithFilteredInverse(
+      name: String,
+      input: Seq[Seq[(String, Int)]],
+      expectedOutput: Seq[Seq[(String, Int)]],
+      windowDuration: Duration = Seconds(2),
+      slideDuration: Duration = Seconds(1)
+    ) {
+    test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+      val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+      val filterFunc = (p: (String, Int)) => p._2 != 0
+      val operation = (s: DStream[(String, Int)]) => {
+        s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
+          .persist()
+          .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
+      }
+      testOperation(input, operation, expectedOutput, numBatches, true)
+    }
+  }
 }