diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index 5a5fc4c840079cd21e3ba4d269205ffe4d299813..ee3fda25a8d791d07f86268953cc207a66be4009 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -1,10 +1,7 @@ package spark -import java.util.concurrent._ - -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet -import scala.collection.mutable.Map +import java.util.concurrent.LinkedBlockingQueue +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} /** * A Scheduler subclass that implements stage-oriented scheduling. It computes @@ -56,17 +53,15 @@ private abstract class DAGScheduler extends Scheduler with Logging { shuffleToMapStage.get(shuf) match { case Some(stage) => stage case None => - val stage = newStage( - true, shuf.rdd, shuf.spec.partitioner.numPartitions) + val stage = newStage(shuf.rdd, Some(shuf)) shuffleToMapStage(shuf) = stage stage } } - def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = { + def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = { val id = newStageId() - val parents = getParentStages(rdd) - val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions) + val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd)) idToStage(id) = stage stage } @@ -121,7 +116,7 @@ private abstract class DAGScheduler extends Scheduler with Logging { override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) : Array[U] = { val numOutputParts: Int = rdd.splits.size - val finalStage = newStage(false, rdd, numOutputParts) + val finalStage = newStage(rdd, None) val results = new Array[U](numOutputParts) val finished = new Array[Boolean](numOutputParts) var numFinished = 0 @@ -130,6 +125,10 @@ private abstract class DAGScheduler extends Scheduler with Logging { val running = new HashSet[Stage] val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] + logInfo("Final stage: " + finalStage) + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + def submitStage(stage: Stage) { if (!waiting(stage) && !running(stage)) { val missing = getMissingParentStages(stage) @@ -146,13 +145,20 @@ private abstract class DAGScheduler extends Scheduler with Logging { } def submitMissingTasks(stage: Stage) { - var tasks: List[Task[_]] = Nil + val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) + var tasks = ArrayBuffer[Task[_]]() if (stage == finalStage) { for (p <- 0 until numOutputParts if (!finished(p))) { val locs = getPreferredLocs(rdd, p) - tasks = new ResultTask(rdd, func, p, locs) :: tasks + tasks += new ResultTask(finalStage.id, rdd, func, p, locs) + } + } else { + for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { + val locs = getPreferredLocs(stage.rdd, p) + tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } + myPending ++= tasks submitTasks(tasks) } @@ -161,13 +167,35 @@ private abstract class DAGScheduler extends Scheduler with Logging { while (numFinished != numOutputParts) { val evt = completionEvents.take() if (evt.successful) { + logInfo("Completed " + evt.task) Accumulators.add(currentThread, evt.accumUpdates) evt.task match { case rt: ResultTask[_, _] => results(rt.partition) = evt.result.asInstanceOf[U] finished(rt.partition) = true numFinished += 1 - // case smt: ShuffleMapTask + pendingTasks(finalStage) -= rt + case smt: ShuffleMapTask => + val stage = idToStage(smt.stageId) + stage.addOutputLoc(smt.partition, evt.result.asInstanceOf[String]) + val pending = pendingTasks(stage) + pending -= smt + MapOutputTracker.registerMapOutputs( + stage.shuffleDep.get.shuffleId, + stage.outputLocs.map(_.first).toArray) + if (pending.isEmpty) { + logInfo(stage + " finished; looking for newly runnable stages") + running -= stage + val newlyRunnable = new ArrayBuffer[Stage] + for (stage <- waiting if getMissingParentStages(stage) == Nil) { + newlyRunnable += stage + } + waiting --= newlyRunnable + running ++= newlyRunnable + for (stage <- newlyRunnable) { + submitMissingTasks(stage) + } + } } } else { throw new SparkException("Task failed: " + evt.task) @@ -199,53 +227,10 @@ private abstract class DAGScheduler extends Scheduler with Logging { if (locs != Nil) return locs; } + case _ => }) return Nil } } case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) - -class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) { - val outputLocs = Array.fill[List[String]](numPartitions)(Nil) - var numAvailableOutputs = 0 - - def isAvailable: Boolean = { - if (parents.size == 0 && !isShuffleMap) - true - else - numAvailableOutputs == numPartitions - } - - def addOutputLoc(partition: Int, host: String) { - val prevList = outputLocs(partition) - outputLocs(partition) = host :: prevList - if (prevList == Nil) - numAvailableOutputs += 1 - } - - def removeOutputLoc(partition: Int, host: String) { - val prevList = outputLocs(partition) - val newList = prevList - host - outputLocs(partition) = newList - if (prevList != Nil && newList == Nil) - numAvailableOutputs -= 1 - } - - override def toString = "Stage " + id - - override def hashCode(): Int = id -} - -class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) -extends Task[U] { - val split = rdd.splits(partition) - - override def run: U = { - func(rdd.iterator(split)) - } - - override def preferredLocations: Seq[String] = locs - - override def toString = "ResultTask " + partition -} \ No newline at end of file diff --git a/core/src/main/scala/spark/DfsShuffle.scala b/core/src/main/scala/spark/DfsShuffle.scala deleted file mode 100644 index 7a42bf2d06f624b40ad7d9c17ba01e9849524772..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/DfsShuffle.scala +++ /dev/null @@ -1,120 +0,0 @@ -package spark - -import java.io.{EOFException, ObjectInputStream, ObjectOutputStream} -import java.net.URI -import java.util.UUID - -import scala.collection.mutable.HashMap - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} - - -/** - * A simple implementation of shuffle using a distributed file system. - * - * TODO: Add support for compression when spark.compress is set to true. - */ -@serializable -class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { - override def compute(input: RDD[(K, V)], - numOutputSplits: Int, - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) - : RDD[(K, C)] = - { - val sc = input.sparkContext - val dir = DfsShuffle.newTempDirectory() - logInfo("Intermediate data directory: " + dir) - - val numberedSplitRdd = new NumberedSplitRDD(input) - val numInputSplits = numberedSplitRdd.splits.size - - // Run a parallel foreach to write the intermediate data files - numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => { - val myIndex = pair._1 - val myIterator = pair._2 - val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C]) - for ((k, v) <- myIterator) { - var bucketId = k.hashCode % numOutputSplits - if (bucketId < 0) { // Fix bucket ID if hash code was negative - bucketId += numOutputSplits - } - val bucket = buckets(bucketId) - bucket(k) = bucket.get(k) match { - case Some(c) => mergeValue(c, v) - case None => createCombiner(v) - } - } - val fs = DfsShuffle.getFileSystem() - for (i <- 0 until numOutputSplits) { - val path = new Path(dir, "%d-to-%d".format(myIndex, i)) - val out = new ObjectOutputStream(fs.create(path, true)) - buckets(i).foreach(pair => out.writeObject(pair)) - out.close() - } - }) - - // Return an RDD that does each of the merges for a given partition - val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits) - return indexes.flatMap((myIndex: Int) => { - val combiners = new HashMap[K, C] - val fs = DfsShuffle.getFileSystem() - for (i <- Utils.shuffle(0 until numInputSplits)) { - val path = new Path(dir, "%d-to-%d".format(i, myIndex)) - val inputStream = new ObjectInputStream(fs.open(path)) - try { - while (true) { - val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] - combiners(k) = combiners.get(k) match { - case Some(oldC) => mergeCombiners(oldC, c) - case None => c - } - } - } catch { - case e: EOFException => {} - } - inputStream.close() - } - combiners - }) - } -} - - -/** - * Companion object of DfsShuffle; responsible for initializing a Hadoop - * FileSystem object based on the spark.dfs property and generating names - * for temporary directories. - */ -object DfsShuffle { - private var initialized = false - private var fileSystem: FileSystem = null - - private def initializeIfNeeded() = synchronized { - if (!initialized) { - val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - val dfs = System.getProperty("spark.dfs", "file:///") - val conf = new Configuration() - conf.setInt("io.file.buffer.size", bufferSize) - conf.setInt("dfs.replication", 1) - fileSystem = FileSystem.get(new URI(dfs), conf) - initialized = true - } - } - - def getFileSystem(): FileSystem = { - initializeIfNeeded() - return fileSystem - } - - def newTempDirectory(): String = { - val fs = getFileSystem() - val workDir = System.getProperty("spark.dfs.workdir", "/tmp") - val uuid = UUID.randomUUID() - val path = workDir + "/shuffle-" + uuid - fs.mkdirs(new Path(path)) - return path - } -} diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala index 367599cfb499dd6bb457a248856c7b417c3092ef..fd70c54c0c7b35d3e49b202e1b6c56d49c6270bd 100644 --- a/core/src/main/scala/spark/LocalFileShuffle.scala +++ b/core/src/main/scala/spark/LocalFileShuffle.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * * TODO: Add support for compression when spark.compress is set to true. */ +/* @serializable class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { override def compute(input: RDD[(K, V)], @@ -90,7 +91,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { }) } } - +*/ object LocalFileShuffle extends Logging { private var initialized = false diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 26fc5c9fdbe688aef85594cdba30f1c576adb944..0287082687cb32d34dd1330341b2842a4ef14b1e 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -25,12 +25,12 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { Accumulators.clear val bytes = Utils.serialize(tasks(i)) logInfo("Size of task " + i + " is " + bytes.size + " bytes") - val task = Utils.deserialize[Task[_]]( + val deserializedTask = Utils.deserialize[Task[_]]( bytes, currentThread.getContextClassLoader) - val result: Any = task.run + val result: Any = deserializedTask.run val accumUpdates = Accumulators.values logInfo("Finished task " + i) - taskEnded(task, true, result, accumUpdates) + taskEnded(tasks(i), true, result, accumUpdates) } catch { case e: Exception => { // TODO: Do something nicer here diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala new file mode 100644 index 0000000000000000000000000000000000000000..2c487cb627626a87f73c3488b5c9d210604cad0f --- /dev/null +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -0,0 +1,29 @@ +package spark + +import java.util.concurrent.ConcurrentHashMap + +object MapOutputTracker { + private val serverUris = new ConcurrentHashMap[Int, Array[String]] + + def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) { + var array = serverUris.get(shuffleId) + if (array == null) { + array = Array.fill[String](numMaps)(null) + serverUris.put(shuffleId, array) + } + array(mapId) = serverUri + } + + def registerMapOutputs(shuffleId: Int, locs: Array[String]) { + serverUris.put(shuffleId, Array[String]() ++ locs) + } + + def getServerUris(shuffleId: Int): Array[String] = { + // TODO: On remote node, fetch locations from master + serverUris.get(shuffleId) + } + + def getMapOutputUri(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int): String = { + "%s/shuffle/%s/%s/%s".format(serverUri, shuffleId, mapId, reduceId) + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/NumberedSplitRDD.scala b/core/src/main/scala/spark/NumberedSplitRDD.scala deleted file mode 100644 index 7b12210d849e97b44d8a4e07789f5d81e4b2f00a..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/NumberedSplitRDD.scala +++ /dev/null @@ -1,42 +0,0 @@ -package spark - -import mesos.SlaveOffer - - -/** - * An RDD that takes the splits of a parent RDD and gives them unique indexes. - * This is useful for a variety of shuffle implementations. - */ -class NumberedSplitRDD[T: ClassManifest](prev: RDD[T]) -extends RDD[(Int, Iterator[T])](prev.sparkContext) { - @transient val splits_ = { - prev.splits.zipWithIndex.map { - case (s, i) => new NumberedSplitRDDSplit(s, i): Split - }.toArray - } - - override def splits = splits_ - - override def preferredLocations(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.preferredLocations(nsplit.prev) - } - - override def iterator(split: Split) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - Iterator((nsplit.index, prev.iterator(nsplit.prev))) - } - - override def taskStarted(split: Split, slot: SlaveOffer) = { - val nsplit = split.asInstanceOf[NumberedSplitRDDSplit] - prev.taskStarted(nsplit.prev, slot) - } -} - - -/** - * A split in a NumberedSplitRDD. - */ -class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split { - override def getId() = "NumberedSplitRDDSplit(%d)".format(index) -} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 37baed6e4cd8a06798d6b2136e4113babf5c2b0d..9b650427c81f02581ca853d89e5d76d7dd1d04a1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -1,5 +1,8 @@ package spark +import java.io.EOFException +import java.net.URL +import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import java.util.HashSet import java.util.Random @@ -25,16 +28,17 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { } class ShuffleDependency[K, V, C]( + val shuffleId: Int, rdd: RDD[(K, V)], - val spec: ShuffleSpec[K, V, C] + val aggregator: Aggregator[K, V, C], + val partitioner: Partitioner[K] ) extends Dependency(rdd, true) @serializable -class ShuffleSpec[K, V, C] ( +class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C, - val partitioner: Partitioner[K] + val mergeCombiners: (C, C) => C ) @serializable @@ -43,6 +47,15 @@ abstract class Partitioner[K] { def getPartition(key: K): Int } +class HashPartitioner[K](partitions: Int) extends Partitioner[K] { + def numPartitions = partitions + + def getPartition(key: K) = { + val mod = key.hashCode % partitions + if (mod < 0) mod + partitions else mod // Careful of negative hash codes + } +} + @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def splits: Array[Split] @@ -52,25 +65,27 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { val dependencies: List[Dependency[_]] = Nil val partitioner: Option[Partitioner[_]] = None - def taskStarted(split: Split, slot: SlaveOffer) {} - def sparkContext = sc def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) + + def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = + new FlatMappedRDD(this, sc.clean(f)) + + /* def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) + def cache() = new CachedRDD(this) def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, frac, seed) - def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = - new FlatMappedRDD(this, sc.clean(f)) - def foreach(f: T => Unit) { val cleanF = sc.clean(f) val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray sc.runTaskObjects(tasks) } + */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) @@ -97,6 +112,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { return results.reduceLeft(f) } + /* def take(num: Int): Array[T] = { if (num == 0) return new Array[T](0) @@ -113,6 +129,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } + */ def count(): Long = { try { @@ -126,10 +143,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def ++(other: RDD[T]): RDD[T] = this.union(other) - def splitRdd(): RDD[Array[T]] = new SplitRDD(this) + //def splitRdd(): RDD[Array[T]] = new SplitRDD(this) - def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = - new CartesianRDD(sc, this, other) + //def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = + // new CartesianRDD(sc, this, other) def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = this.map(t => (func(t), t)).groupByKey(numSplits) @@ -138,72 +155,31 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { groupBy[K](func, sc.numCores) } -@serializable -abstract class RDDTask[U: ClassManifest, T: ClassManifest]( - val rdd: RDD[T], val split: Split) -extends Task[U] { - override def preferredLocations() = rdd.preferredLocations(split) - override def markStarted(slot: SlaveOffer) { rdd.taskStarted(split, slot) } -} - -class ForeachTask[T: ClassManifest]( - rdd: RDD[T], split: Split, func: T => Unit) -extends RDDTask[Unit, T](rdd, split) with Logging { - override def run() { - logInfo("Processing " + split) - rdd.iterator(split).foreach(func) - } -} - -class CollectTask[T]( - rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]) -extends RDDTask[Array[T], T](rdd, split) with Logging { - override def run(): Array[T] = { - logInfo("Processing " + split) - rdd.iterator(split).toArray(m) - } -} - -class ReduceTask[T: ClassManifest]( - rdd: RDD[T], split: Split, f: (T, T) => T) -extends RDDTask[Option[T], T](rdd, split) with Logging { - override def run(): Option[T] = { - logInfo("Processing " + split) - val iter = rdd.iterator(split) - if (iter.hasNext) - Some(iter.reduceLeft(f)) - else - None - } -} - class MappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => U) extends RDD[U](prev.sparkContext) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) + override val dependencies = List(new OneToOneDependency(prev)) override def iterator(split: Split) = prev.iterator(split).map(f) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) +} + +class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], f: T => Traversable[U]) +extends RDD[U](prev.sparkContext) { + override def splits = prev.splits + override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) + override def iterator(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator } +/* class FilteredRDD[T: ClassManifest]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev.sparkContext) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) override def iterator(split: Split) = prev.iterator(split).filter(f) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) -} - -class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], f: T => Traversable[U]) -extends RDD[U](prev.sparkContext) { - override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) - override def iterator(split: Split) = - prev.iterator(split).toStream.flatMap(f).iterator - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) } class SplitRDD[T: ClassManifest](prev: RDD[T]) @@ -211,7 +187,6 @@ extends RDD[Array[T]](prev.sparkContext) { override def splits = prev.splits override def preferredLocations(split: Split) = prev.preferredLocations(split) override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) } @@ -245,8 +220,6 @@ extends RDD[T](prev.sparkContext) { prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac)) } } - - override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split.asInstanceOf[SeededSplit].prev, slot) } @@ -316,13 +289,14 @@ private object CachedRDD { // Remembers which splits are currently being loaded (on workers) val loading = new HashSet[String] } +*/ @serializable -class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split) +class UnionSplit[T: ClassManifest](rdd: RDD[T], index: Int, split: Split) extends Split { def iterator() = rdd.iterator(split) def preferredLocations() = rdd.preferredLocations(split) - override def getId() = "UnionSplit(" + split.getId() + ")" + override def getId() = "UnionSplit(" + index + ", " + split.getId() + ")" } @serializable @@ -330,8 +304,8 @@ class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T](sc) { @transient val splits_ : Array[Split] = { val splits: Seq[Split] = - for (rdd <- rdds; split <- rdd.splits) - yield new UnionSplit(rdd, split) + for ((rdd, index) <- rdds.zipWithIndex; split <- rdd.splits) + yield new UnionSplit(rdd, index, split) splits.toArray } @@ -344,6 +318,7 @@ extends RDD[T](sc) { s.asInstanceOf[UnionSplit[T]].preferredLocations() } +/* @serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { override def getId() = "CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")" @@ -376,6 +351,58 @@ extends RDD[Pair[T, U]](sc) { rdd2.taskStarted(currSplit.s2, slot) } } +*/ + +class ShuffledRDDSplit(val id: Int) extends Split { + override def getId() = "ShuffleRDDSplit(" + id + ")" +} + +class ShuffledRDD[K, V, C]( + parent: RDD[(K, V)], + aggregator: Aggregator[K, V, C], + partitioner: Partitioner[K]) +extends RDD[(K, C)](parent.sparkContext) { + @transient val splits_ = + Array.tabulate[Split](partitioner.numPartitions)(i => new ShuffledRDDSplit(i)) + + val dep = new ShuffleDependency(sparkContext.newShuffleId, parent, aggregator, partitioner) + + override def splits = splits_ + + override def preferredLocations(split: Split) = Nil + + override def iterator(split: Split): Iterator[(K, C)] = { + val shuffleId = dep.shuffleId + val splitId = split.asInstanceOf[ShuffledRDDSplit].id + val splitsByUri = new HashMap[String, ArrayBuffer[Int]] + val serverUris = MapOutputTracker.getServerUris(shuffleId) + for ((serverUri, index) <- serverUris.zipWithIndex) { + splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index + } + val combiners = new HashMap[K, C] + for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { + for (i <- inputIds) { + val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId) + val inputStream = new ObjectInputStream(new URL(url).openStream()) + try { + while (true) { + val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] + combiners(k) = combiners.get(k) match { + case Some(oldC) => aggregator.mergeCombiners(oldC, c) + case None => c + } + } + } catch { + case e: EOFException => {} + } + inputStream.close() + } + } + combiners.iterator + } + + override val dependencies = List(dep) +} @serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) { def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { @@ -397,10 +424,16 @@ extends RDD[Pair[T, U]](sc) { numSplits: Int) : RDD[(K, C)] = { + val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + val partitioner = new HashPartitioner[K](numSplits) + new ShuffledRDD(self, aggregator, partitioner) + // TODO + /* val shufClass = Class.forName(System.getProperty( "spark.shuffle.class", "spark.LocalFileShuffle")) val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]] shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners) + */ } def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala new file mode 100644 index 0000000000000000000000000000000000000000..3b63896175e9d34d9601e0efcb1ab6059236c096 --- /dev/null +++ b/core/src/main/scala/spark/ResultTask.scala @@ -0,0 +1,14 @@ +package spark + +class ResultTask[T, U](val stageId: Int, rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) +extends Task[U] { + val split = rdd.splits(partition) + + override def run: U = { + func(rdd.iterator(split)) + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ResultTask(" + stageId + ", " + partition + ")" +} \ No newline at end of file diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala new file mode 100644 index 0000000000000000000000000000000000000000..287c64a9cc8e10a4e0fa77569e3664ced5f5f749 --- /dev/null +++ b/core/src/main/scala/spark/ShuffleMapTask.scala @@ -0,0 +1,38 @@ +package spark + +import java.io.FileOutputStream +import java.io.ObjectOutputStream +import scala.collection.mutable.HashMap + + +class ShuffleMapTask(val stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String]) +extends Task[String] { + val split = rdd.splits(partition) + + override def run: String = { + val numOutputSplits = dep.partitioner.numPartitions + val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] + val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]] + val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) + for (elem <- rdd.iterator(split)) { + val (k, v) = elem.asInstanceOf[(Any, Any)] + var bucketId = partitioner.getPartition(k) + val bucket = buckets(bucketId) + bucket(k) = bucket.get(k) match { + case Some(c) => aggregator.mergeValue(c, v) + case None => aggregator.createCombiner(v) + } + } + for (i <- 0 until numOutputSplits) { + val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i) + val out = new ObjectOutputStream(new FileOutputStream(file)) + buckets(i).foreach(pair => out.writeObject(pair)) + out.close() + } + return LocalFileShuffle.getServerUri + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition) +} \ No newline at end of file diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 04bd86180b76019e46bbc958227feccc505c912e..b4799d7c080ee89cf59e7bf47fe501f01fcacdc0 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -89,8 +89,8 @@ extends Logging { } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = - new UnionRDD(this, rdds) + //def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = + // new UnionRDD(this, rdds) // Methods for creating shared variables @@ -157,6 +157,14 @@ extends Logging { // Get the number of cores available to run tasks (as reported by Scheduler) def numCores = scheduler.numCores + + private var nextShuffleId: Int = 0 + + private[spark] def newShuffleId(): Int = { + val id = nextShuffleId + nextShuffleId += 1 + id + } } diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala new file mode 100644 index 0000000000000000000000000000000000000000..82b70ce60d78cfcfc081f616bae1393efc5995bc --- /dev/null +++ b/core/src/main/scala/spark/Stage.scala @@ -0,0 +1,34 @@ +package spark + +class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) { + val isShuffleMap = shuffleDep != None + val numPartitions = rdd.splits.size + val outputLocs = Array.fill[List[String]](numPartitions)(Nil) + var numAvailableOutputs = 0 + + def isAvailable: Boolean = { + if (parents.size == 0 && !isShuffleMap) + true + else + numAvailableOutputs == numPartitions + } + + def addOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + outputLocs(partition) = host :: prevList + if (prevList == Nil) + numAvailableOutputs += 1 + } + + def removeOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + val newList = prevList - host + outputLocs(partition) = newList + if (prevList != Nil && newList == Nil) + numAvailableOutputs -= 1 + } + + override def toString = "Stage " + id + + override def hashCode(): Int = id +}