diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index ac84033f06f97db7329badf1e8a907a5a45a82f4..4c427bd67cc8944c9d45e70b3975bc78d28414ad 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -19,23 +19,25 @@ extends Split { } @serializable -class CoGroupAggregator[K] extends Aggregator[K, Any, ArrayBuffer[Any]] ( +class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( { x => ArrayBuffer(x) }, { (b, x) => b += x }, { (b1, b2) => b1 ++ b2 } ) -class CoGroupedRDD[K](rdds: Seq[RDD[(K, _)]], part: Partitioner[K]) -extends RDD[(K, Seq[Seq[_]])](rdds.first.context) { - val aggr = new CoGroupAggregator[K] +class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) +extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging { + val aggr = new CoGroupAggregator override val dependencies = { val deps = new ArrayBuffer[Dependency[_]] for ((rdd, index) <- rdds.zipWithIndex) { if (rdd.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + rdd) deps += new OneToOneDependency(rdd) } else { - deps += new ShuffleDependency[K, Any, ArrayBuffer[Any]]( + logInfo("Adding shuffle dependency with " + rdd) + deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( context.newShuffleId, rdd, aggr, part) } } @@ -60,7 +62,7 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) { override def splits = splits_ - override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]]) + override val partitioner = Some(part) override def preferredLocations(s: Split) = Nil @@ -70,15 +72,16 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) { def getSeq(k: K): Seq[ArrayBuffer[Any]] = { map.getOrElseUpdate(k, Array.fill(rdds.size)(new ArrayBuffer[Any])) } - for ((dep, index) <- split.deps.zipWithIndex) dep match { + for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, itsSplit) => { // Read them from the parent for ((k: K, v) <- rdd.iterator(itsSplit)) { - getSeq(k)(index) += v + getSeq(k)(depNum) += v } } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle + logInfo("Grabbing map outputs for shuffle ID " + shuffleId) val splitsByUri = new HashMap[String, ArrayBuffer[Int]] val serverUris = MapOutputTracker.getServerUris(shuffleId) for ((serverUri, index) <- serverUris.zipWithIndex) { @@ -86,14 +89,15 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) { } for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { for (i <- inputIds) { - val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, index) + val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, split.index) val inputStream = new ObjectInputStream(new URL(url).openStream()) + logInfo("Opened stream to " + url) try { while (true) { val (k, vs) = inputStream.readObject().asInstanceOf[(K, Seq[Any])] val mySeq = getSeq(k) for (v <- vs) - mySeq(index) += v + mySeq(depNum) += v } } catch { case e: EOFException => {} diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 20b0357e44ad96b9d24e7d3af3a2716445d4564c..c83736a424b0ad964a657b0251b944bdccffef8e 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -12,7 +12,7 @@ class ShuffleDependency[K, V, C]( val shuffleId: Int, rdd: RDD[(K, V)], val aggregator: Aggregator[K, V, C], - val partitioner: Partitioner[K] + val partitioner: Partitioner ) extends Dependency(rdd, true) class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index cc1ca744475387129eae87273fa72fb4b9b68f04..92057604da5fd448e5fdfb2e760fcca8f7e0afb6 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -1,21 +1,21 @@ package spark @serializable -abstract class Partitioner[K] { +abstract class Partitioner { def numPartitions: Int - def getPartition(key: K): Int + def getPartition(key: Any): Int } -class HashPartitioner[K](partitions: Int) extends Partitioner[K] { +class HashPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions - def getPartition(key: K) = { + def getPartition(key: Any) = { val mod = key.hashCode % partitions if (mod < 0) mod + partitions else mod // Guard against negative hash codes } override def equals(other: Any): Boolean = other match { - case h: HashPartitioner[_] => + case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 4d9a62a89caa1e514902adaadcd94f8882e5349d..68e39652f01b24121b97d2122dee5321e883170e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -24,7 +24,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { val dependencies: List[Dependency[_]] // Optionally overridden by subclasses to specify how they are partitioned - val partitioner: Option[Partitioner[_]] = None + val partitioner: Option[Partitioner] = None def context = sc @@ -111,6 +111,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def toArray(): Array[T] = collect() + + override def toString(): String = { + "%s(%d)".format(getClass.getSimpleName, id) + } // TODO: Reimplement these to properly build any shuffle dependencies on // the cluster rather than attempting to compute a partiton on the master @@ -191,7 +195,7 @@ extends RDD[Array[T]](prev.context) { : RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - val partitioner = new HashPartitioner[K](numSplits) + val partitioner = new HashPartitioner(numSplits) new ShuffledRDD(self, aggregator, partitioner) } @@ -253,6 +257,33 @@ extends RDD[Array[T]](prev.context) { new MappedValuesRDD(self, cleanF) } + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + val part = self.partitioner match { + case Some(p) => p + case None => new HashPartitioner(numCores) + } + new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map { + case (k, Seq(vs, ws)) => + (k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])) + } + } + + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + val part = self.partitioner match { + case Some(p) => p + case None => new HashPartitioner(numCores) + } + new CoGroupedRDD[K]( + Seq(self.asInstanceOf[RDD[(_, _)]], + other1.asInstanceOf[RDD[(_, _)]], + other2.asInstanceOf[RDD[(_, _)]]), + part).map { + case (k, Seq(vs, w1s, w2s)) => + (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) + } + } + /* def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { if (self.partitioner != None) { diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala index 287c64a9cc8e10a4e0fa77569e3664ced5f5f749..290950e867693a73bbb7861e505825b523414011 100644 --- a/core/src/main/scala/spark/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/ShuffleMapTask.scala @@ -12,7 +12,7 @@ extends Task[String] { override def run: String = { val numOutputSplits = dep.partitioner.numPartitions val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] - val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]] + val partitioner = dep.partitioner.asInstanceOf[Partitioner] val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) for (elem <- rdd.iterator(split)) { val (k, v) = elem.asInstanceOf[(Any, Any)] diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 26ccce0e8344a78eb84f956a95c2ce9f3cd4863c..683df12019199af0ca6b75ba422118efa00e4a44 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -14,10 +14,10 @@ class ShuffledRDDSplit(val idx: Int) extends Split { class ShuffledRDD[K, V, C]( parent: RDD[(K, V)], aggregator: Aggregator[K, V, C], - part : Partitioner[K]) + part : Partitioner) extends RDD[(K, C)](parent.context) { //override val partitioner = Some(part) - override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]]) + override val partitioner = Some(part) @transient val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))