From 2911a783d692ef218d4f62ec27884005e047d22c Mon Sep 17 00:00:00 2001 From: Ankur Dave <ankurdave@gmail.com> Date: Sun, 9 Oct 2011 15:47:20 -0700 Subject: [PATCH] Add custom partitioner support to PairRDDFunctions.combineByKey --- core/src/main/scala/spark/PairRDDFunctions.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 73827deeda..964ec26498 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -47,14 +47,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numSplits: Int) + numSplits: Int, + partitioner: Partitioner) : RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - val partitioner = new HashPartitioner(numSplits) new ShuffledRDD(self, aggregator, partitioner) } + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int) + : RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits, + new HashPartitioner(numSplits)) + } + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, numSplits) } -- GitLab