diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala index e751e1bd7511addeb12f738df47b6ba4b63c2c46..10f77a824a334ba49b747e053d778cebd98cfece 100644 --- a/src/scala/spark/DfsShuffle.scala +++ b/src/scala/spark/DfsShuffle.scala @@ -12,50 +12,16 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} import mesos.SlaveOffer -/** - * An RDD that captures the splits of a parent RDD and gives them unique indexes. - * This is useful for a variety of shuffle implementations. - */ -class NumberedSplitRDD[T: ClassManifest](prev: RDD[T]) -extends RDD[(Int, Iterator[T])](prev.sparkContext) { - @transient val splits_ = { - prev.splits.zipWithIndex.map { - case (s, i) => new NumberedSplitRDDSplit(s, i): Split - }.toArray - } - - override def splits = splits_ - - override def preferredLocations(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.preferredLocations(nsplit.prev) - } - - override def iterator(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - Iterator((nsplit.index, prev.iterator(nsplit.prev))) - } - - override def taskStarted(split: Split, slot: SlaveOffer) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.taskStarted(nsplit.prev, slot) - } -} - - -class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split { - override def getId() = "NumberedSplitRDDSplit(%d)".format(index) -} - - /** * A simple implementation of shuffle using a distributed file system. + * + * TODO: Add support for compression when spark.compress is set to true. */ @serializable class DfsShuffle[K, V, C]( rdd: RDD[(K, V)], numOutputSplits: Int, - createCombiner: () => C, + createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) extends Logging @@ -72,11 +38,12 @@ extends Logging numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => { val myIndex = pair._1 val myIterator = pair._2 - val combiners = new HashMap[K, C] { - override def default(key: K) = createCombiner() - } + val combiners = new HashMap[K, C] for ((k, v) <- myIterator) { - combiners(k) = mergeValue(combiners(k), v) + combiners(k) = combiners.get(k) match { + case Some(c) => mergeValue(c, v) + case None => createCombiner(v) + } } val fs = DfsShuffle.getFileSystem() val outputStreams = (0 until numOutputSplits).map(i => { @@ -95,17 +62,18 @@ extends Logging // Return an RDD that does each of the merges for a given partition return sc.parallelize(0 until numOutputSplits).flatMap((myIndex: Int) => { - val combiners = new HashMap[K, C] { - override def default(key: K) = createCombiner() - } + val combiners = new HashMap[K, C] val fs = DfsShuffle.getFileSystem() for (i <- Utils.shuffle(0 until numInputSplits)) { val path = new Path(dir, "%d-to-%d".format(i, myIndex)) val inputStream = new ObjectInputStream(fs.open(path)) try { while (true) { - val pair = inputStream.readObject().asInstanceOf[(K, C)] - combiners(pair._1) = mergeCombiners(combiners(pair._1), pair._2) + val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] + combiners(k) = combiners.get(k) match { + case Some(oldC) => mergeCombiners(oldC, c) + case None => c + } } } catch { case e: EOFException => {} @@ -147,3 +115,39 @@ object DfsShuffle { return path } } + + +/** + * An RDD that captures the splits of a parent RDD and gives them unique indexes. + * This is useful for a variety of shuffle implementations. + */ +class NumberedSplitRDD[T: ClassManifest](prev: RDD[T]) +extends RDD[(Int, Iterator[T])](prev.sparkContext) { + @transient val splits_ = { + prev.splits.zipWithIndex.map { + case (s, i) => new NumberedSplitRDDSplit(s, i): Split + }.toArray + } + + override def splits = splits_ + + override def preferredLocations(split: Split) = { + val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] + prev.preferredLocations(nsplit.prev) + } + + override def iterator(split: Split) = { + val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] + Iterator((nsplit.index, prev.iterator(nsplit.prev))) + } + + override def taskStarted(split: Split, slot: SlaveOffer) = { + val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] + prev.taskStarted(nsplit.prev, slot) + } +} + + +class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split { + override def getId() = "NumberedSplitRDDSplit(%d)".format(index) +} diff --git a/src/scala/spark/HadoopFile.scala b/src/scala/spark/HadoopFile.scala index 5746c433eecabce3296a663a739b13228ef0f9a0..a63c9d8a94d90be5fe36fc2316472c316e1bc208 100644 --- a/src/scala/spark/HadoopFile.scala +++ b/src/scala/spark/HadoopFile.scala @@ -38,7 +38,7 @@ extends RDD[(K, V)](sc) { val conf = new JobConf() FileInputFormat.setInputPaths(conf, path) val inputFormat = createInputFormat(conf) - val inputSplits = inputFormat.getSplits(conf, sc.scheduler.numCores) + val inputSplits = inputFormat.getSplits(conf, sc.numCores) inputSplits.map(x => new HadoopSplit(x): Split).toArray } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index e48bb82ec389b3ff72858ee4d6059032622e8f80..3c4aa3c3c9cbf6ac875e0f693c0394c1da6853bf 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -330,8 +330,8 @@ extends RDD[Pair[T, U]](sc) { } } -@serializable class PairRDDExtras[K, V](rdd: RDD[(K, V)]) { - def reduceByKey(func: (V, V) => V): Map[K, V] = { +@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) { + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { for ((k, v) <- m2) { m1.get(k) match { @@ -341,14 +341,66 @@ extends RDD[Pair[T, U]](sc) { } return m1 } - rdd.map(pair => HashMap(pair)).reduce(mergeMaps) + self.map(pair => HashMap(pair)).reduce(mergeMaps) } - def combineByKey[C](numSplits: Int, - createCombiner: () => C, + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int) + : RDD[(K, C)] = { + new DfsShuffle(self, numSplits, createCombiner, mergeValue, mergeCombiners).compute() + } + + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { + combineByKey[V]((v: V) => v, func, func, numSplits) + } + + def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, numSplits) + bufs.asInstanceOf[RDD[(K, Seq[V])]] + } + + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { + val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } + val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } + new PairRDDExtras(vs ++ ws).groupByKey(numSplits).flatMap { + case (k, seq) => { + val vbuf = new ArrayBuffer[V] + val wbuf = new ArrayBuffer[W] + seq.foreach(_ match { + case Left(v) => vbuf += v + case Right(w) => wbuf += w + }) + for (v <- vbuf; w <- wbuf) yield (k, (v, w)) + } + } + } + + def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { - new DfsShuffle(rdd, numSplits, createCombiner, mergeValue, mergeCombiners).compute() + : RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, numCores) + } + + def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { + reduceByKey(func, numCores) + } + + def groupByKey(): RDD[(K, Seq[V])] = { + groupByKey(numCores) } + + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + join(other, numCores) + } + + def numCores = self.sparkContext.numCores + + def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) } diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 24fe0e9bbbc9f1b0ec2254d7ca65b00c54418fcd..02e80c77562101ecb2eb83addd82214880e8611c 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -14,7 +14,7 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil) extends Logging { - private[spark] var scheduler: Scheduler = { + private var scheduler: Scheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r master match { @@ -41,7 +41,7 @@ extends Logging { new ParallelArray[T](this, seq, numSlices) def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] = - parallelize(seq, scheduler.numCores) + parallelize(seq, numCores) def textFile(path: String): RDD[String] = new HadoopTextFile(this, path) @@ -147,6 +147,9 @@ extends Logging { ClosureCleaner.clean(f) return f } + + // Get the number of cores available to run tasks (as reported by Scheduler) + def numCores = scheduler.numCores }