From ae2234687d9040b42619c374eadfd40c896d386d Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Sat, 16 Feb 2013 13:10:31 -0600
Subject: [PATCH] Make CoGroupedRDDs explicitly have the same key type.

---
 core/src/main/scala/spark/PairRDDFunctions.scala          | 8 ++++----
 core/src/main/scala/spark/rdd/CoGroupedRDD.scala          | 4 ++--
 core/src/test/scala/spark/CheckpointSuite.scala           | 2 +-
 .../main/scala/spark/streaming/PairDStreamFunctions.scala | 2 +-
 .../scala/spark/streaming/dstream/CoGroupedDStream.scala  | 2 +-
 .../spark/streaming/dstream/ReducedWindowedDStream.scala  | 2 +-
 6 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index cc3cca2571..36b9880cd1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](
-        Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
+        Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
         partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
     prfs.mapValues {
@@ -380,9 +380,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](
-        Seq(self.asInstanceOf[RDD[(_, _)]],
-            other1.asInstanceOf[RDD[(_, _)]],
-            other2.asInstanceOf[RDD[(_, _)]]),
+        Seq(self.asInstanceOf[RDD[(K, _)]],
+            other1.asInstanceOf[RDD[(K, _)]],
+            other2.asInstanceOf[RDD[(K, _)]]),
         partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
     prfs.mapValues {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 0a1e2cbee0..868ee5a39f 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator
     { (b1, b2) => b1 ++ b2 })
   with Serializable
 
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
-  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
   private val aggr = new CoGroupAggregator
 
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0d08fd2396..51ff966ae4 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -347,7 +347,7 @@ object CheckpointSuite {
   def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
     //println("First = " + first + ", second = " + second)
     new CoGroupedRDD[K](
-      Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]),
+      Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
       part
     ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
   }
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index fbcf061126..5db3844f1d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -457,7 +457,7 @@ extends Serializable {
     ): DStream[(K, (Seq[V], Seq[W]))] = {
 
     val cgd = new CoGroupedDStream[K](
-      Seq(self.asInstanceOf[DStream[(_, _)]], other.asInstanceOf[DStream[(_, _)]]),
+      Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
       partitioner
     )
     val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
index ddb1bf6b28..4ef4bb7de1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
@@ -6,7 +6,7 @@ import spark.streaming.{Time, DStream, Duration}
 
 private[streaming]
 class CoGroupedDStream[K : ClassManifest](
-    parents: Seq[DStream[(_, _)]],
+    parents: Seq[DStream[(K, _)]],
     partitioner: Partitioner
   ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
 
diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
index 733d5c4a25..263655039c 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -101,7 +101,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
     val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
 
     // Cogroup the reduced RDDs and merge the reduced values
-    val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
+    val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
     //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
 
     val numOldValues = oldRDDs.size
-- 
GitLab