Skip to content
Snippets Groups Projects
Commit 3260b612 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #470 from stephenh/morek

Make CoGroupedRDDs explicitly have the same key type.
parents 9d979fb6 ae223468
No related branches found
No related tags found
No related merge requests found
...@@ -363,7 +363,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( ...@@ -363,7 +363,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
} }
val cg = new CoGroupedRDD[K]( val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
partitioner) partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues { prfs.mapValues {
...@@ -382,9 +382,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( ...@@ -382,9 +382,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
} }
val cg = new CoGroupedRDD[K]( val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], Seq(self.asInstanceOf[RDD[(K, _)]],
other1.asInstanceOf[RDD[(_, _)]], other1.asInstanceOf[RDD[(K, _)]],
other2.asInstanceOf[RDD[(_, _)]]), other2.asInstanceOf[RDD[(K, _)]]),
partitioner) partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues { prfs.mapValues {
......
...@@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator ...@@ -40,8 +40,8 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 }) { (b1, b2) => b1 ++ b2 })
with Serializable with Serializable
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging { extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator private val aggr = new CoGroupAggregator
......
...@@ -347,7 +347,7 @@ object CheckpointSuite { ...@@ -347,7 +347,7 @@ object CheckpointSuite {
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = { def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
//println("First = " + first + ", second = " + second) //println("First = " + first + ", second = " + second)
new CoGroupedRDD[K]( new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]), Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part part
).asInstanceOf[RDD[(K, Seq[Seq[V]])]] ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
} }
......
...@@ -457,7 +457,7 @@ extends Serializable { ...@@ -457,7 +457,7 @@ extends Serializable {
): DStream[(K, (Seq[V], Seq[W]))] = { ): DStream[(K, (Seq[V], Seq[W]))] = {
val cgd = new CoGroupedDStream[K]( val cgd = new CoGroupedDStream[K](
Seq(self.asInstanceOf[DStream[(_, _)]], other.asInstanceOf[DStream[(_, _)]]), Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]),
partitioner partitioner
) )
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
......
...@@ -6,7 +6,7 @@ import spark.streaming.{Time, DStream, Duration} ...@@ -6,7 +6,7 @@ import spark.streaming.{Time, DStream, Duration}
private[streaming] private[streaming]
class CoGroupedDStream[K : ClassManifest]( class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]], parents: Seq[DStream[(K, _)]],
partitioner: Partitioner partitioner: Partitioner
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
......
...@@ -101,7 +101,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( ...@@ -101,7 +101,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values // Cogroup the reduced RDDs and merge the reduced values
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
//val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size val numOldValues = oldRDDs.size
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment