From bd336f5f406386c929f2d1f9aecd7d5190a1a087 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@cs.berkeley.edu>
Date: Thu, 10 Jan 2013 17:13:04 -0800
Subject: [PATCH] Changed CoGroupRDD's hash map from Scala to Java.

---
 core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index de0d9fad88..2e051c81c8 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,7 +1,8 @@
 package spark.rdd
 
+import java.util.{HashMap => JHashMap}
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
 
 import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext}
 import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -71,7 +72,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
   override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
     val split = s.asInstanceOf[CoGroupSplit]
     val numRdds = split.deps.size
-    val map = new HashMap[K, Seq[ArrayBuffer[Any]]]
+    val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
     def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
       map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any]))
     }
-- 
GitLab