Skip to content
Snippets Groups Projects
Commit 5945bcdc authored by Reynold Xin's avatar Reynold Xin
Browse files

Added a new flag in Aggregator to indicate applying map side combiners.

parent c68e820b
No related branches found
No related tags found
No related merge requests found
package spark package spark
/** A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
* @param mapSideCombine whether to apply combiners on map partitions, also
* known as map-side aggregations. When set to false,
* mergeCombiners function is not used.
*/
class Aggregator[K, V, C] ( class Aggregator[K, V, C] (
val createCombiner: V => C, val createCombiner: V => C,
val mergeValue: (C, V) => C, val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C) val mergeCombiners: (C, C) => C,
val mapSideCombine: Boolean = true)
extends Serializable extends Serializable
...@@ -29,10 +29,9 @@ class ShuffledRDD[K, V, C]( ...@@ -29,10 +29,9 @@ class ShuffledRDD[K, V, C](
val combiners = new JHashMap[K, C] val combiners = new JHashMap[K, C]
val fetcher = SparkEnv.get.shuffleFetcher val fetcher = SparkEnv.get.shuffleFetcher
if (aggregator.mergeCombiners != null) { if (aggregator.mapSideCombine) {
// If mergeCombiners is specified, combiners are applied on the map // Apply combiners on map partitions. In this case, post-shuffle we get a
// partitions. In this case, post-shuffle we get a list of outputs from // list of outputs from the combiners and merge them using mergeCombiners.
// the combiners and merge them using mergeCombiners.
def mergePairWithMapSideCombiners(k: K, c: C) { def mergePairWithMapSideCombiners(k: K, c: C) {
val oldC = combiners.get(k) val oldC = combiners.get(k)
if (oldC == null) { if (oldC == null) {
...@@ -43,9 +42,9 @@ class ShuffledRDD[K, V, C]( ...@@ -43,9 +42,9 @@ class ShuffledRDD[K, V, C](
} }
fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners) fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners)
} else { } else {
// If mergeCombiners is not specified, no combiner is applied on the map // Do not apply combiners on map partitions (i.e. map side aggregation is
// partitions (i.e. map side aggregation is turned off). Post-shuffle we // turned off). Post-shuffle we get a list of values and we use mergeValue
// get a list of values and we use mergeValue to merge them. // to merge them.
def mergePairWithoutMapSideCombiners(k: K, v: V) { def mergePairWithoutMapSideCombiners(k: K, v: V) {
val oldC = combiners.get(k) val oldC = combiners.get(k)
if (oldC == null) { if (oldC == null) {
......
...@@ -108,7 +108,7 @@ class ShuffleMapTask( ...@@ -108,7 +108,7 @@ class ShuffleMapTask(
val partitioner = dep.partitioner val partitioner = dep.partitioner
val bucketIterators = val bucketIterators =
if (aggregator.mergeCombiners != null) { if (aggregator.mapSideCombine) {
// Apply combiners (map-side aggregation) to the map output. // Apply combiners (map-side aggregation) to the map output.
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
for (elem <- rdd.iterator(split)) { for (elem <- rdd.iterator(split)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment