diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala new file mode 100644 index 0000000000000000000000000000000000000000..5a5fc4c840079cd21e3ba4d269205ffe4d299813 --- /dev/null +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -0,0 +1,251 @@ +package spark + +import java.util.concurrent._ + +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet +import scala.collection.mutable.Map + +/** + * A Scheduler subclass that implements stage-oriented scheduling. It computes + * a DAG of stages for each job, keeps track of which RDDs and stage outputs + * are materialized, and computes a minimal schedule to run the job. Subclasses + * only need to implement the code to send a task to the cluster and to report + * failures from it (the submitTasks method, and code to add completionEvents). + */ +private abstract class DAGScheduler extends Scheduler with Logging { + // Must be implemented by subclasses to start running a set of tasks + def submitTasks(tasks: Seq[Task[_]]): Unit + + // Must be called by subclasses to report task completions or failures + def taskEnded(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) { + completionEvents.put(CompletionEvent(task, successful, result, accumUpdates)) + } + + private val completionEvents = new LinkedBlockingQueue[CompletionEvent] + + var nextStageId = 0 + + def newStageId() = { + var res = nextStageId + nextStageId += 1 + res + } + + val idToStage = new HashMap[Int, Stage] + + val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage] + + val cacheLocs = new HashMap[RDD[_], Array[List[String]]] + + def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { + cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil)) + } + + def addCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) = host :: locs(partition) + } + + def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) -= host + } + + def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = { + shuffleToMapStage.get(shuf) match { + case Some(stage) => stage + case None => + val stage = newStage( + true, shuf.rdd, shuf.spec.partitioner.numPartitions) + shuffleToMapStage(shuf) = stage + stage + } + } + + def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = { + val id = newStageId() + val parents = getParentStages(rdd) + val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions) + idToStage(id) = stage + stage + } + + def getParentStages(rdd: RDD[_]): List[Stage] = { + val parents = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(r: RDD[_]) { + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + parents += getShuffleMapStage(shufDep) + case _ => + visit(dep.rdd) + } + } + } + } + visit(rdd) + parents.toList + } + + def getMissingParentStages(stage: Stage): List[Stage] = { + val missing = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + if (!visited(rdd)) { + visited += rdd + val locs = getCacheLocs(rdd) + for (p <- 0 until rdd.splits.size) { + if (locs(p) == Nil) { + for (dep <- rdd.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + val stage = getShuffleMapStage(shufDep) + if (!stage.isAvailable) + missing += stage + case narrowDep: NarrowDependency[_] => + visit(narrowDep.rdd) + } + } + } + } + } + } + visit(stage.rdd) + missing.toList + } + + override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + val numOutputParts: Int = rdd.splits.size + val finalStage = newStage(false, rdd, numOutputParts) + val results = new Array[U](numOutputParts) + val finished = new Array[Boolean](numOutputParts) + var numFinished = 0 + + val waiting = new HashSet[Stage] + val running = new HashSet[Stage] + val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] + + def submitStage(stage: Stage) { + if (!waiting(stage) && !running(stage)) { + val missing = getMissingParentStages(stage) + if (missing == Nil) { + logInfo("Submitting " + stage + ", which has no missing parents") + submitMissingTasks(stage) + running += stage + } else { + for (parent <- missing) + submitStage(parent) + waiting += stage + } + } + } + + def submitMissingTasks(stage: Stage) { + var tasks: List[Task[_]] = Nil + if (stage == finalStage) { + for (p <- 0 until numOutputParts if (!finished(p))) { + val locs = getPreferredLocs(rdd, p) + tasks = new ResultTask(rdd, func, p, locs) :: tasks + } + } + submitTasks(tasks) + } + + submitStage(finalStage) + + while (numFinished != numOutputParts) { + val evt = completionEvents.take() + if (evt.successful) { + Accumulators.add(currentThread, evt.accumUpdates) + evt.task match { + case rt: ResultTask[_, _] => + results(rt.partition) = evt.result.asInstanceOf[U] + finished(rt.partition) = true + numFinished += 1 + // case smt: ShuffleMapTask + } + } else { + throw new SparkException("Task failed: " + evt.task) + // TODO: Kill the running job + } + } + + return results + } + + def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + // If the partition is cached, return the cache locations + val cached = getCacheLocs(rdd)(partition) + if (cached != Nil) { + return cached + } + // If the RDD has some placement preferences (as is the case for input RDDs), get those + val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList + if (rddPrefs != Nil) { + return rddPrefs + } + // If the RDD has narrow dependencies, pick the first partition of the first narrow dep + // that has any placement preferences. Ideally we would choose based on transfer sizes, + // but this will do for now. + rdd.dependencies.foreach(_ match { + case n: NarrowDependency[_] => + for (inPart <- n.getParents(partition)) { + val locs = getPreferredLocs(n.rdd, inPart) + if (locs != Nil) + return locs; + } + }) + return Nil + } +} + +case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) + +class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) { + val outputLocs = Array.fill[List[String]](numPartitions)(Nil) + var numAvailableOutputs = 0 + + def isAvailable: Boolean = { + if (parents.size == 0 && !isShuffleMap) + true + else + numAvailableOutputs == numPartitions + } + + def addOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + outputLocs(partition) = host :: prevList + if (prevList == Nil) + numAvailableOutputs += 1 + } + + def removeOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + val newList = prevList - host + outputLocs(partition) = newList + if (prevList != Nil && newList == Nil) + numAvailableOutputs -= 1 + } + + override def toString = "Stage " + id + + override def hashCode(): Int = id +} + +class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) +extends Task[U] { + val split = rdd.splits(partition) + + override def run: U = { + func(rdd.iterator(split)) + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ResultTask " + partition +} \ No newline at end of file diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 8518229856fec8bc840964474cbc2f89b8c4d6b4..26fc5c9fdbe688aef85594cdba30f1c576adb944 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -2,14 +2,10 @@ package spark import java.util.concurrent._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet -import scala.collection.mutable.Map - /** * A simple Scheduler implementation that runs tasks locally in a thread pool. */ -private class LocalScheduler(threads: Int) extends Scheduler with Logging { +private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { var threadPool: ExecutorService = Executors.newFixedThreadPool(threads, DaemonThreadFactory) @@ -17,9 +13,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { override def waitForRegister() {} - val completionEvents = new LinkedBlockingQueue[CompletionEvent] - - def submitTasks(tasks: Seq[Task[_]]) { + override def submitTasks(tasks: Seq[Task[_]]) { tasks.zipWithIndex.foreach { case (task, i) => threadPool.submit(new Runnable { def run() { @@ -36,7 +30,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { val result: Any = task.run val accumUpdates = Accumulators.values logInfo("Finished task " + i) - completionEvents.put(CompletionEvent(task, true, result, accumUpdates)) + taskEnded(task, true, result, accumUpdates) } catch { case e: Exception => { // TODO: Do something nicer here @@ -53,228 +47,4 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { override def stop() {} override def numCores() = threads - - var nextStageId = 0 - - def newStageId() = { - var res = nextStageId - nextStageId += 1 - res - } - - val idToStage = new HashMap[Int, Stage] - - val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage] - - val cacheLocs = new HashMap[RDD[_], Array[List[String]]] - - def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { - cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil)) - } - - def addCacheLoc(rdd: RDD[_], partition: Int, host: String) { - val locs = getCacheLocs(rdd) - locs(partition) = host :: locs(partition) - } - - def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) { - val locs = getCacheLocs(rdd) - locs(partition) -= host - } - - def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = { - shuffleToMapStage.get(shuf) match { - case Some(stage) => stage - case None => - val stage = newStage( - true, shuf.rdd, shuf.spec.partitioner.numPartitions) - shuffleToMapStage(shuf) = stage - stage - } - } - - def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = { - val id = newStageId() - val parents = getParentStages(rdd) - val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions) - idToStage(id) = stage - stage - } - - def getParentStages(rdd: RDD[_]): List[Stage] = { - val parents = new HashSet[Stage] - val visited = new HashSet[RDD[_]] - def visit(r: RDD[_]) { - if (!visited(r)) { - visited += r - for (dep <- r.dependencies) { - dep match { - case shufDep: ShuffleDependency[_,_,_] => - parents += getShuffleMapStage(shufDep) - case _ => - visit(dep.rdd) - } - } - } - } - visit(rdd) - parents.toList - } - - def getMissingParentStages(stage: Stage): List[Stage] = { - val missing = new HashSet[Stage] - val visited = new HashSet[RDD[_]] - def visit(rdd: RDD[_]) { - if (!visited(rdd)) { - visited += rdd - val locs = getCacheLocs(rdd) - for (p <- 0 until rdd.splits.size) { - if (locs(p) == Nil) { - for (dep <- rdd.dependencies) { - dep match { - case shufDep: ShuffleDependency[_,_,_] => - val stage = getShuffleMapStage(shufDep) - if (!stage.isAvailable) - missing += stage - case narrowDep: NarrowDependency[_] => - visit(narrowDep.rdd) - } - } - } - } - } - } - visit(stage.rdd) - missing.toList - } - - override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) - : Array[U] = { - val numOutputParts: Int = rdd.splits.size - val finalStage = newStage(false, rdd, numOutputParts) - val results = new Array[U](numOutputParts) - val finished = new Array[Boolean](numOutputParts) - var numFinished = 0 - - val waiting = new HashSet[Stage] - val running = new HashSet[Stage] - val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] - - def submitStage(stage: Stage) { - if (!waiting(stage) && !running(stage)) { - val missing = getMissingParentStages(stage) - if (missing == Nil) { - logInfo("Submitting " + stage + ", which has no missing parents") - submitMissingTasks(stage) - running += stage - } else { - for (parent <- missing) - submitStage(parent) - waiting += stage - } - } - } - - def submitMissingTasks(stage: Stage) { - var tasks: List[Task[_]] = Nil - if (stage == finalStage) { - for (p <- 0 until numOutputParts if (!finished(p))) { - val locs = getPreferredLocs(rdd, p) - tasks = new ResultTask(rdd, func, p, locs) :: tasks - } - } - submitTasks(tasks) - } - - submitStage(finalStage) - - while (numFinished != numOutputParts) { - val evt = completionEvents.take() - if (evt.successful) { - evt.task match { - case rt: ResultTask[_, _] => - results(rt.partition) = evt.result.asInstanceOf[U] - finished(rt.partition) = true - numFinished += 1 - // case smt: ShuffleMapTask - } - } else { - throw new SparkException("Task failed: " + evt.task) - // TODO: Kill the running job - } - } - - return results - } - - def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { - // If the partition is cached, return the cache locations - val cached = getCacheLocs(rdd)(partition) - if (cached != Nil) { - return cached - } - // If the RDD has some placement preferences (as is the case for input RDDs), get those - val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList - if (rddPrefs != Nil) { - return rddPrefs - } - // If the RDD has narrow dependencies, pick the first partition of the first narrow dep - // that has any placement preferences. Ideally we would choose based on transfer sizes, - // but this will do for now. - rdd.dependencies.foreach(_ match { - case n: NarrowDependency[_] => - for (inPart <- n.getParents(partition)) { - val locs = getPreferredLocs(n.rdd, inPart) - if (locs != Nil) - return locs; - } - }) - return Nil - } -} - -case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) - -class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) -extends Task[U] { - val split = rdd.splits(partition) - - override def run: U = { - func(rdd.iterator(split)) - } - - override def preferredLocations: Seq[String] = locs - - override def toString = "ResultTask " + partition -} - -class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) { - val outputLocs = Array.fill[List[String]](numPartitions)(Nil) - var numAvailableOutputs = 0 - - def isAvailable: Boolean = { - if (parents.size == 0 && !isShuffleMap) - true - else - numAvailableOutputs == numPartitions - } - - def addOutputLoc(partition: Int, host: String) { - val prevList = outputLocs(partition) - outputLocs(partition) = host :: prevList - if (prevList == Nil) - numAvailableOutputs += 1 - } - - def removeOutputLoc(partition: Int, host: String) { - val prevList = outputLocs(partition) - val newList = prevList - host - outputLocs(partition) = newList - if (prevList != Nil && newList == Nil) - numAvailableOutputs -= 1 - } - - override def toString = "Stage " + id - - override def hashCode(): Int = id } \ No newline at end of file diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 391b54f4eb73f6470d74882d2782c4816c8cb1bd..37baed6e4cd8a06798d6b2136e4113babf5c2b0d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,6 +12,7 @@ import SparkContext._ import mesos._ +@serializable abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) abstract class NarrowDependency[T](rdd: RDD[T]) @@ -19,11 +20,16 @@ extends Dependency(rdd, false) { def getParents(outputPartition: Int): Seq[Int] } +class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { + override def getParents(partitionId: Int) = List(partitionId) +} + class ShuffleDependency[K, V, C]( rdd: RDD[(K, V)], val spec: ShuffleSpec[K, V, C] ) extends Dependency(rdd, true) +@serializable class ShuffleSpec[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, @@ -31,6 +37,7 @@ class ShuffleSpec[K, V, C] ( val partitioner: Partitioner[K] ) +@serializable abstract class Partitioner[K] { def numPartitions: Int def getPartition(key: K): Int @@ -42,8 +49,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def iterator(split: Split): Iterator[T] def preferredLocations(split: Split): Seq[String] - def dependencies: List[Dependency[_]] = Nil - def partitioner: Option[Partitioner[_]] = None + val dependencies: List[Dependency[_]] = Nil + val partitioner: Option[Partitioner[_]] = None def taskStarted(split: Split, slot: SlaveOffer) {} @@ -66,7 +73,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def collect(): Array[T] = { - val results = sc.scheduler.runJob(this, (iter: Iterator[T]) => iter.toArray) + val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } @@ -80,7 +87,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { else None } - val options = sc.scheduler.runJob(this, reducePartition) + val options = sc.runJob(this, reducePartition) val results = new ArrayBuffer[T] for (opt <- options; elem <- opt) results += elem @@ -177,6 +184,7 @@ extends RDD[U](prev.sparkContext) { override def preferredLocations(split: Split) = prev.preferredLocations(split) override def iterator(split: Split) = prev.iterator(split).map(f) override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) + override val dependencies = List(new OneToOneDependency(prev)) } class FilteredRDD[T: ClassManifest]( diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 7c3058792800fc4ff9dcabcda9235bfd3db986ea..04bd86180b76019e46bbc958227feccc505c912e 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -14,7 +14,7 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil) extends Logging { - private[spark] var scheduler: Scheduler = { + private var scheduler: Scheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r master match { @@ -139,6 +139,15 @@ extends Logging { */ } + private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + logInfo("Starting job...") + val start = System.nanoTime + val result = scheduler.runJob(rdd, func) + logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + result + } + // Clean a closure to make it ready to serialized and send to tasks // (removes unreferenced variables in $outer's, updates REPL variables) private[spark] def clean[F <: AnyRef](f: F): F = {