diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index fd00183668b6174cb39d19add5715607eb2081f9..354ab8ae5d7d5c425cd3b62ea689554cc56b3294 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -157,19 +157,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(partitions2(0).length > 0)
     assert(partitions2(19).length > 0)
     assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
-
-    // Coalesce partitions - no shuffle
-    val repartitioned3 = data.repartition(2, skipShuffle = true)
-    assert(repartitioned3.partitions.size == 2)
-    val partitions3 = repartitioned3.glom().collect()
-    assert(partitions3(0).toList === (1 to 500).toList)
-    assert(partitions3(1).toList === (501 to 1000).toList)
-    assert(repartitioned3.collect().toSet === (1 to 1000).toSet)
-
-    // Split partitions - no shuffle (should throw exn)
-    intercept[IllegalArgumentException] {
-      data.repartition(20, skipShuffle = true)
-    }
   }
 
   test("coalesced RDDs") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 11586f72b6ddb081be0e4bc9e29f989ada9e7876..55cfcb371ab3af0b6ebcb820322763ce819f3c03 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase {
     testOperation(input, operation, output, true)
   }
 
+  test("repartition (more partitions)") {
+    val input = Seq(1 to 100, 101 to 200, 201 to 300)
+    val operation = (r: DStream[Int]) => r.repartition(5)
+    val ssc = setupStreams(input, operation, 2)
+    val output = runStreamsWithPartitions(ssc, 3, 3)
+    assert(output.size === 3)
+    val first = output(0)
+    val second = output(1)
+    val third = output(2)
+
+    assert(first.size === 5)
+    assert(second.size === 5)
+    assert(third.size === 5)
+
+    assert(first.flatten.toSet === (1 to 100).toSet)
+    assert(second.flatten.toSet === (101 to 200).toSet)
+    assert(third.flatten.toSet === (201 to 300).toSet)
+  }
+
+  test("repartition (fewer partitions)") {
+    val input = Seq(1 to 100, 101 to 200, 201 to 300)
+    val operation = (r: DStream[Int]) => r.repartition(2)
+    val ssc = setupStreams(input, operation, 5)
+    val output = runStreamsWithPartitions(ssc, 3, 3)
+    assert(output.size === 3)
+    val first = output(0)
+    val second = output(1)
+    val third = output(2)
+
+    assert(first.size === 2)
+    assert(second.size === 2)
+    assert(third.size === 2)
+
+    assert(first.flatten.toSet === (1 to 100).toSet)
+    assert(second.flatten.toSet === (101 to 200).toSet)
+    assert(third.flatten.toSet === (201 to 300).toSet)
+  }
+
   test("groupByKey") {
     testOperation(
       Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 37dd9c4cc61d29d28f247bbdf9044b80e8fce76b..26f515a778c3687860451dcaf2a69799783f12d7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
 /**
  * This is a output stream just for the testsuites. All the output is collected into a
  * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
  */
 class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
@@ -75,6 +77,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
   }
 }
 
+/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+ * containing a sequnce of items.
+ */
+class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
+  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+    val collected = rdd.glom().collect().map(_.toSeq)
+    output += collected
+  }) {
+
+  // This is to clear the output buffer every it is read from a checkpoint
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream) {
+    ois.defaultReadObject()
+    output.clear()
+  }
+}
+
 /**
  * This is the base trait for Spark Streaming testsuites. This provides basic functionality
  * to run user-defined set of input on user-defined stream operations, and verify the output.
@@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
    */
   def setupStreams[U: ClassManifest, V: ClassManifest](
       input: Seq[Seq[U]],
-      operation: DStream[U] => DStream[V]
+      operation: DStream[U] => DStream[V],
+      numPartitions: Int = numInputPartitions
     ): StreamingContext = {
 
     // Create StreamingContext
@@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     }
 
     // Setup the stream computation
-    val inputStream = new TestInputStream(ssc, input, numInputPartitions)
+    val inputStream = new TestInputStream(ssc, input, numPartitions)
     val operatedStream = operation(inputStream)
-    val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]])
+    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+      new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
     ssc.registerInputStream(inputStream)
     ssc.registerOutputStream(outputStream)
     ssc
@@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
     val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
     val operatedStream = operation(inputStream1, inputStream2)
-    val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]])
+    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+      new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
     ssc.registerInputStream(inputStream1)
     ssc.registerInputStream(inputStream2)
     ssc.registerOutputStream(outputStream)
@@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
    * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
    * returns the collected output. It will wait until `numExpectedOutput` number of
    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+   *
+   * Returns a sequence of items for each RDD.
    */
   def runStreams[V: ClassManifest](
       ssc: StreamingContext,
       numBatches: Int,
       numExpectedOutput: Int
     ): Seq[Seq[V]] = {
+    // Flatten each RDD into a single Seq
+    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
+  }
+
+  /**
+   * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
+   * returns the collected output. It will wait until `numExpectedOutput` number of
+   * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+   *
+   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+   * representing one partition.
+   */
+  def runStreamsWithPartitions[V: ClassManifest](
+      ssc: StreamingContext,
+      numBatches: Int,
+      numExpectedOutput: Int
+    ): Seq[Seq[Seq[V]]] = {
     assert(numBatches > 0, "Number of batches to run stream computation is zero")
     assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
     logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
 
     // Get the output buffer
-    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
     val output = outputStream.output
 
     try {