Skip to content
Snippets Groups Projects
Commit 2911a783 authored by Ankur Dave's avatar Ankur Dave
Browse files

Add custom partitioner support to PairRDDFunctions.combineByKey

parent 6c6e47e3
No related branches found
No related tags found
No related merge requests found
......@@ -47,14 +47,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int)
numSplits: Int,
partitioner: Partitioner)
: RDD[(K, C)] =
{
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
val partitioner = new HashPartitioner(numSplits)
new ShuffledRDD(self, aggregator, partitioner)
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int)
: RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits,
new HashPartitioner(numSplits))
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, numSplits)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment