diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 73827deedaaf0bd652a8555e2bd4446234f111eb..964ec26498404788794ee4510137f608a2ea6184 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -47,14 +47,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numSplits: Int) + numSplits: Int, + partitioner: Partitioner) : RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - val partitioner = new HashPartitioner(numSplits) new ShuffledRDD(self, aggregator, partitioner) } + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int) + : RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits, + new HashPartitioner(numSplits)) + } + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, numSplits) }