Skip to content
Snippets Groups Projects
Commit 0e84fee7 authored by Reynold Xin's avatar Reynold Xin
Browse files

Removed the mapSideCombine option in partitionBy.

parent 10af952a
No related branches found
No related tags found
No related merge requests found
...@@ -233,31 +233,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( ...@@ -233,31 +233,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
} }
/** /**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` * Return a copy of the RDD partitioned using the specified partitioner.
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
* be set to true.
*/ */
def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = { def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
if (getKeyClass().isArray) { if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
if (mapSideCombine) { throw new SparkException("Default partitioner cannot partition array keys.")
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
if (mapSideCombine) {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
} else {
new ShuffledRDD[K, V](self, partitioner)
} }
new ShuffledRDD[K, V](self, partitioner)
} }
/** /**
......
...@@ -253,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif ...@@ -253,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.subtract(other, p)) fromRDD(rdd.subtract(other, p))
/** /**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` * Return a copy of the RDD partitioned using the specified partitioner.
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
* be set to true.
*/ */
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] = def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.partitionBy(partitioner)) fromRDD(rdd.partitionBy(partitioner))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment