From 0e84fee76b529089fb52f15151202e9a7b847ed5 Mon Sep 17 00:00:00 2001
From: Reynold Xin <reynoldx@gmail.com>
Date: Sat, 17 Aug 2013 21:13:41 -0700
Subject: [PATCH] Removed the mapSideCombine option in partitionBy.

---
 .../main/scala/spark/PairRDDFunctions.scala   | 28 ++++---------------
 .../scala/spark/api/java/JavaPairRDD.scala    |  6 +---
 2 files changed, 6 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 6b0cc2fbf1..fa9df3a97e 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -233,31 +233,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   }
 
   /**
-   * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
-   * is true, Spark will group values of the same key together on the map side before the
-   * repartitioning, to only send each key over the network once. If a large number of
-   * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
-   * be set to true.
+   * Return a copy of the RDD partitioned using the specified partitioner.
    */
-  def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
-    if (getKeyClass().isArray) {
-      if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array keys.")
-      }
-      if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("Default partitioner cannot partition array keys.")
-      }
-    }
-    if (mapSideCombine) {
-      def createCombiner(v: V) = ArrayBuffer(v)
-      def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-      def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
-      val bufs = combineByKey[ArrayBuffer[V]](
-        createCombiner _, mergeValue _, mergeCombiners _, partitioner)
-      bufs.flatMapValues(buf => buf)
-    } else {
-      new ShuffledRDD[K, V](self, partitioner)
+  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
     }
+    new ShuffledRDD[K, V](self, partitioner)
   }
 
   /**
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index ff12e8b76c..c2995b836a 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -253,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
     fromRDD(rdd.subtract(other, p))
 
   /**
-   * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
-   * is true, Spark will group values of the same key together on the map side before the
-   * repartitioning, to only send each key over the network once. If a large number of
-   * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
-   * be set to true.
+   * Return a copy of the RDD partitioned using the specified partitioner.
    */
   def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
     fromRDD(rdd.partitionBy(partitioner))
-- 
GitLab