From 1c67c7dfd1660dd39226742eedcb2948ab1445d0 Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Fri, 22 Mar 2013 08:54:44 -0500
Subject: [PATCH] Add a shuffle parameter to coalesce.

This is useful for when you want just 1 output file (part-00000) but
still up the upstream RDD to be computed in parallel.
---
 core/src/main/scala/spark/RDD.scala                    | 10 +++++++++-
 core/src/main/scala/spark/api/java/JavaDoubleRDD.scala |  6 ++++++
 core/src/main/scala/spark/api/java/JavaPairRDD.scala   |  8 +++++++-
 core/src/main/scala/spark/api/java/JavaRDD.scala       |  6 ++++++
 core/src/test/scala/spark/RDDSuite.scala               |  6 +++++-
 5 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 9bd8a0f98d..0cd904f89d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
 import spark.rdd.MapPartitionsWithIndexRDD
 import spark.rdd.PipedRDD
 import spark.rdd.SampledRDD
+import spark.rdd.ShuffledRDD
 import spark.rdd.SubtractedRDD
 import spark.rdd.UnionRDD
 import spark.rdd.ZippedRDD
@@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest](
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
    */
-  def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
+  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+    if (shuffle) {
+      // include a shuffle step so that are upstream tasks are still distributed
+      new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+    } else {
+      new CoalescedRDD(this, numPartitions)
+    }
+  }
 
   /**
    * Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index ba00b6a844..26cbb1a641 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -57,6 +57,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    */
   def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
 
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: java.lang.Boolean): JavaDoubleRDD =
+    fromRDD(srdd.coalesce(numPartitions, shuffle))
+
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
    * 
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 49aaabf835..9ce77e8e84 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
   /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
    */
-  def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
+  def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
+
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: java.lang.Boolean): JavaPairRDD[K, V] =
+    fromRDD(rdd.coalesce(numPartitions, shuffle))
 
   /**
    * Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3016888898..7223dbbe64 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -43,6 +43,12 @@ JavaRDDLike[T, JavaRDD[T]] {
    */
   def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
 
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: java.lang.Boolean): JavaRDD[T] =
+    rdd.coalesce(numPartitions, shuffle)
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 9739ba869b..bcbb472f6c 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
 import scala.collection.mutable.HashMap
 import org.scalatest.FunSuite
 import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, PartitionPruningRDD}
+import spark.rdd.{CoalescedRDD, PartitionPruningRDD, ShuffledRDD}
 
 class RDDSuite extends FunSuite with LocalSparkContext {
 
@@ -154,6 +154,10 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(coalesced4.collect().toList === (1 to 10).toList)
     assert(coalesced4.glom().collect().map(_.toList).toList ===
       (1 to 10).map(x => List(x)).toList)
+
+    // we can optionally shuffle to keep the upstream parallel
+    val coalesced5 = data.coalesce(1, shuffle = true)
+    assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] ne null)
   }
 
   test("zipped RDDs") {
-- 
GitLab