diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala new file mode 100644 index 0000000000000000000000000000000000000000..ac84033f06f97db7329badf1e8a907a5a45a82f4 --- /dev/null +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -0,0 +1,108 @@ +package spark + +import java.net.URL +import java.io.EOFException +import java.io.ObjectInputStream +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +@serializable +sealed trait CoGroupSplitDep +case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep +case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep + +@serializable +class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) +extends Split { + override val index = idx + override def hashCode(): Int = idx +} + +@serializable +class CoGroupAggregator[K] extends Aggregator[K, Any, ArrayBuffer[Any]] ( + { x => ArrayBuffer(x) }, + { (b, x) => b += x }, + { (b1, b2) => b1 ++ b2 } +) + +class CoGroupedRDD[K](rdds: Seq[RDD[(K, _)]], part: Partitioner[K]) +extends RDD[(K, Seq[Seq[_]])](rdds.first.context) { + val aggr = new CoGroupAggregator[K] + + override val dependencies = { + val deps = new ArrayBuffer[Dependency[_]] + for ((rdd, index) <- rdds.zipWithIndex) { + if (rdd.partitioner == Some(part)) { + deps += new OneToOneDependency(rdd) + } else { + deps += new ShuffleDependency[K, Any, ArrayBuffer[Any]]( + context.newShuffleId, rdd, aggr, part) + } + } + deps.toList + } + + @transient val splits_ : Array[Split] = { + val firstRdd = rdds.first + val array = new Array[Split](part.numPartitions) + for (i <- 0 until array.size) { + array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => + dependencies(j) match { + case s: ShuffleDependency[_, _, _] => + new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep + case _ => + new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep + } + }.toList) + } + array + } + + override def splits = splits_ + + override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]]) + + override def preferredLocations(s: Split) = Nil + + override def compute(s: Split): Iterator[(K, Seq[Seq[_]])] = { + val split = s.asInstanceOf[CoGroupSplit] + val map = new HashMap[K, Seq[ArrayBuffer[Any]]] + def getSeq(k: K): Seq[ArrayBuffer[Any]] = { + map.getOrElseUpdate(k, Array.fill(rdds.size)(new ArrayBuffer[Any])) + } + for ((dep, index) <- split.deps.zipWithIndex) dep match { + case NarrowCoGroupSplitDep(rdd, itsSplit) => { + // Read them from the parent + for ((k: K, v) <- rdd.iterator(itsSplit)) { + getSeq(k)(index) += v + } + } + case ShuffleCoGroupSplitDep(shuffleId) => { + // Read map outputs of shuffle + val splitsByUri = new HashMap[String, ArrayBuffer[Int]] + val serverUris = MapOutputTracker.getServerUris(shuffleId) + for ((serverUri, index) <- serverUris.zipWithIndex) { + splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index + } + for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { + for (i <- inputIds) { + val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, index) + val inputStream = new ObjectInputStream(new URL(url).openStream()) + try { + while (true) { + val (k, vs) = inputStream.readObject().asInstanceOf[(K, Seq[Any])] + val mySeq = getSeq(k) + for (v <- vs) + mySeq(index) += v + } + } catch { + case e: EOFException => {} + } + inputStream.close() + } + } + } + } + map.iterator + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index df044bd6cf931849f372f68ae7c88f9b8669d9f0..4d9a62a89caa1e514902adaadcd94f8882e5349d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -246,4 +246,46 @@ extends RDD[Array[T]](prev.context) { def numCores = self.context.numCores def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) + + def mapValues[U](f: V => U): RDD[(K, U)] = + { + val cleanF = self.context.clean(f) + new MappedValuesRDD(self, cleanF) + } + + /* + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + if (self.partitioner != None) { + val part = self.partitoner.get + if (other.partitioner != None && other.partitioner.get == part) { + // Can do a partition-wise cogroup + return new PartitionWiseGroupedRDD(self, other) + } + } + + val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } + val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } + (vs ++ ws).groupByKey(numSplits).flatMap { + case (k, seq) => { + val vbuf = new ArrayBuffer[V] + val wbuf = new ArrayBuffer[W] + seq.foreach(_ match { + case Left(v) => vbuf += v + case Right(w) => wbuf += w + }) + for (v <- vbuf; w <- wbuf) yield (k, (v, w)) + } + } + } + */ } + +class MappedValuesRDD[K, V, U]( + prev: RDD[(K, V)], f: V => U) +extends RDD[(K, U)](prev.context) { + override def splits = prev.splits + override def preferredLocations(split: Split) = prev.preferredLocations(split) + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} + override val partitioner = prev.partitioner +} \ No newline at end of file diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 826957a469ac7a02f98306ae7f0a8d5b2b2fdf35..26ccce0e8344a78eb84f956a95c2ce9f3cd4863c 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -16,7 +16,8 @@ class ShuffledRDD[K, V, C]( aggregator: Aggregator[K, V, C], part : Partitioner[K]) extends RDD[(K, C)](parent.context) { - override val partitioner = Some(part) + //override val partitioner = Some(part) + override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]]) @transient val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))