Skip to content
Snippets Groups Projects
Commit 12d59312 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Create fewer function objects in uses of AppendOnlyMap.changeValue

parent 0b35051f
No related branches found
No related tags found
No related merge requests found
...@@ -33,28 +33,26 @@ case class Aggregator[K, V, C] ( ...@@ -33,28 +33,26 @@ case class Aggregator[K, V, C] (
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C] val combiners = new AppendOnlyMap[K, C]
for ((k, v) <- iter) { var kv: Product2[K, V] = null
combiners.changeValue(k, (hadValue, oldValue) => { val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
mergeValue(oldValue, v) }
} else { while (iter.hasNext) {
createCombiner(v) kv = iter.next()
} combiners.changeValue(kv._1, update)
})
} }
combiners.iterator combiners.iterator
} }
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C] val combiners = new AppendOnlyMap[K, C]
for ((k, c) <- iter) { var kc: (K, C) = null
combiners.changeValue(k, (hadValue, oldValue) => { val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) { if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
mergeCombiners(oldValue, c) }
} else { while (iter.hasNext) {
c kc = iter.next()
} combiners.changeValue(kc._1, update)
})
} }
combiners.iterator combiners.iterator
} }
......
...@@ -106,10 +106,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: ...@@ -106,10 +106,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = { val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
map.changeValue(k, (hadValue, oldValue) => { if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
if (hadValue) oldValue else Array.fill(numRdds)(new ArrayBuffer[Any]) }
})
val getSeq = (k: K) => {
map.changeValue(k, update)
} }
val ser = SparkEnv.get.serializerManager.get(serializerClass) val ser = SparkEnv.get.serializerManager.get(serializerClass)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment