Skip to content
Snippets Groups Projects
Commit 9cfa0683 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #450 from stephenh/inlinemergepair

Inline mergePair to look more like the narrow dep branch.
parents 03eefbb2 8bd0e888
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
val seq = map.get(k)
......@@ -104,13 +105,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
def mergePair(pair: (K, Seq[Any])) {
val mySeq = getSeq(pair._1)
for (v <- pair._2)
mySeq(depNum) += v
}
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
for ((k, vs) <- fetcher.fetch[K, Seq[Any]](shuffleId, split.index)) {
getSeq(k)(depNum) ++= vs
}
}
}
JavaConversions.mapAsScalaMap(map).iterator
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment