Skip to content
Snippets Groups Projects
Commit 2e6023f2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

stuff

parent 309367c4
No related branches found
No related tags found
No related merge requests found
package spark
import java.util.concurrent._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.Map
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes
* a DAG of stages for each job, keeps track of which RDDs and stage outputs
* are materialized, and computes a minimal schedule to run the job. Subclasses
* only need to implement the code to send a task to the cluster and to report
* failures from it (the submitTasks method, and code to add completionEvents).
*/
private abstract class DAGScheduler extends Scheduler with Logging {
// Must be implemented by subclasses to start running a set of tasks
def submitTasks(tasks: Seq[Task[_]]): Unit
// Must be called by subclasses to report task completions or failures
def taskEnded(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) {
completionEvents.put(CompletionEvent(task, successful, result, accumUpdates))
}
private val completionEvents = new LinkedBlockingQueue[CompletionEvent]
var nextStageId = 0
def newStageId() = {
var res = nextStageId
nextStageId += 1
res
}
val idToStage = new HashMap[Int, Stage]
val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
}
def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
val locs = getCacheLocs(rdd)
locs(partition) = host :: locs(partition)
}
def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
val locs = getCacheLocs(rdd)
locs(partition) -= host
}
def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
shuffleToMapStage.get(shuf) match {
case Some(stage) => stage
case None =>
val stage = newStage(
true, shuf.rdd, shuf.spec.partitioner.numPartitions)
shuffleToMapStage(shuf) = stage
stage
}
}
def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
val id = newStageId()
val parents = getParentStages(rdd)
val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
idToStage(id) = stage
stage
}
def getParentStages(rdd: RDD[_]): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] =>
parents += getShuffleMapStage(shufDep)
case _ =>
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val locs = getCacheLocs(rdd)
for (p <- 0 until rdd.splits.size) {
if (locs(p) == Nil) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] =>
val stage = getShuffleMapStage(shufDep)
if (!stage.isAvailable)
missing += stage
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
: Array[U] = {
val numOutputParts: Int = rdd.splits.size
val finalStage = newStage(false, rdd, numOutputParts)
val results = new Array[U](numOutputParts)
val finished = new Array[Boolean](numOutputParts)
var numFinished = 0
val waiting = new HashSet[Stage]
val running = new HashSet[Stage]
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage)) {
val missing = getMissingParentStages(stage)
if (missing == Nil) {
logInfo("Submitting " + stage + ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing)
submitStage(parent)
waiting += stage
}
}
}
def submitMissingTasks(stage: Stage) {
var tasks: List[Task[_]] = Nil
if (stage == finalStage) {
for (p <- 0 until numOutputParts if (!finished(p))) {
val locs = getPreferredLocs(rdd, p)
tasks = new ResultTask(rdd, func, p, locs) :: tasks
}
}
submitTasks(tasks)
}
submitStage(finalStage)
while (numFinished != numOutputParts) {
val evt = completionEvents.take()
if (evt.successful) {
Accumulators.add(currentThread, evt.accumUpdates)
evt.task match {
case rt: ResultTask[_, _] =>
results(rt.partition) = evt.result.asInstanceOf[U]
finished(rt.partition) = true
numFinished += 1
// case smt: ShuffleMapTask
}
} else {
throw new SparkException("Task failed: " + evt.task)
// TODO: Kill the running job
}
}
return results
}
def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached != Nil) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
if (rddPrefs != Nil) {
return rddPrefs
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach(_ match {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
if (locs != Nil)
return locs;
}
})
return Nil
}
}
case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
var numAvailableOutputs = 0
def isAvailable: Boolean = {
if (parents.size == 0 && !isShuffleMap)
true
else
numAvailableOutputs == numPartitions
}
def addOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
outputLocs(partition) = host :: prevList
if (prevList == Nil)
numAvailableOutputs += 1
}
def removeOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
val newList = prevList - host
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil)
numAvailableOutputs -= 1
}
override def toString = "Stage " + id
override def hashCode(): Int = id
}
class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
extends Task[U] {
val split = rdd.splits(partition)
override def run: U = {
func(rdd.iterator(split))
}
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask " + partition
}
\ No newline at end of file
...@@ -2,14 +2,10 @@ package spark ...@@ -2,14 +2,10 @@ package spark
import java.util.concurrent._ import java.util.concurrent._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.Map
/** /**
* A simple Scheduler implementation that runs tasks locally in a thread pool. * A simple Scheduler implementation that runs tasks locally in a thread pool.
*/ */
private class LocalScheduler(threads: Int) extends Scheduler with Logging { private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
var threadPool: ExecutorService = var threadPool: ExecutorService =
Executors.newFixedThreadPool(threads, DaemonThreadFactory) Executors.newFixedThreadPool(threads, DaemonThreadFactory)
...@@ -17,9 +13,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { ...@@ -17,9 +13,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
override def waitForRegister() {} override def waitForRegister() {}
val completionEvents = new LinkedBlockingQueue[CompletionEvent] override def submitTasks(tasks: Seq[Task[_]]) {
def submitTasks(tasks: Seq[Task[_]]) {
tasks.zipWithIndex.foreach { case (task, i) => tasks.zipWithIndex.foreach { case (task, i) =>
threadPool.submit(new Runnable { threadPool.submit(new Runnable {
def run() { def run() {
...@@ -36,7 +30,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { ...@@ -36,7 +30,7 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
val result: Any = task.run val result: Any = task.run
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
logInfo("Finished task " + i) logInfo("Finished task " + i)
completionEvents.put(CompletionEvent(task, true, result, accumUpdates)) taskEnded(task, true, result, accumUpdates)
} catch { } catch {
case e: Exception => { case e: Exception => {
// TODO: Do something nicer here // TODO: Do something nicer here
...@@ -53,228 +47,4 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { ...@@ -53,228 +47,4 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging {
override def stop() {} override def stop() {}
override def numCores() = threads override def numCores() = threads
var nextStageId = 0
def newStageId() = {
var res = nextStageId
nextStageId += 1
res
}
val idToStage = new HashMap[Int, Stage]
val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage]
val cacheLocs = new HashMap[RDD[_], Array[List[String]]]
def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil))
}
def addCacheLoc(rdd: RDD[_], partition: Int, host: String) {
val locs = getCacheLocs(rdd)
locs(partition) = host :: locs(partition)
}
def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) {
val locs = getCacheLocs(rdd)
locs(partition) -= host
}
def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
shuffleToMapStage.get(shuf) match {
case Some(stage) => stage
case None =>
val stage = newStage(
true, shuf.rdd, shuf.spec.partitioner.numPartitions)
shuffleToMapStage(shuf) = stage
stage
}
}
def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
val id = newStageId()
val parents = getParentStages(rdd)
val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
idToStage(id) = stage
stage
}
def getParentStages(rdd: RDD[_]): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] =>
parents += getShuffleMapStage(shufDep)
case _ =>
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val locs = getCacheLocs(rdd)
for (p <- 0 until rdd.splits.size) {
if (locs(p) == Nil) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] =>
val stage = getShuffleMapStage(shufDep)
if (!stage.isAvailable)
missing += stage
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
: Array[U] = {
val numOutputParts: Int = rdd.splits.size
val finalStage = newStage(false, rdd, numOutputParts)
val results = new Array[U](numOutputParts)
val finished = new Array[Boolean](numOutputParts)
var numFinished = 0
val waiting = new HashSet[Stage]
val running = new HashSet[Stage]
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage)) {
val missing = getMissingParentStages(stage)
if (missing == Nil) {
logInfo("Submitting " + stage + ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing)
submitStage(parent)
waiting += stage
}
}
}
def submitMissingTasks(stage: Stage) {
var tasks: List[Task[_]] = Nil
if (stage == finalStage) {
for (p <- 0 until numOutputParts if (!finished(p))) {
val locs = getPreferredLocs(rdd, p)
tasks = new ResultTask(rdd, func, p, locs) :: tasks
}
}
submitTasks(tasks)
}
submitStage(finalStage)
while (numFinished != numOutputParts) {
val evt = completionEvents.take()
if (evt.successful) {
evt.task match {
case rt: ResultTask[_, _] =>
results(rt.partition) = evt.result.asInstanceOf[U]
finished(rt.partition) = true
numFinished += 1
// case smt: ShuffleMapTask
}
} else {
throw new SparkException("Task failed: " + evt.task)
// TODO: Kill the running job
}
}
return results
}
def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached != Nil) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
if (rddPrefs != Nil) {
return rddPrefs
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach(_ match {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
if (locs != Nil)
return locs;
}
})
return Nil
}
}
case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
extends Task[U] {
val split = rdd.splits(partition)
override def run: U = {
func(rdd.iterator(split))
}
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask " + partition
}
class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
var numAvailableOutputs = 0
def isAvailable: Boolean = {
if (parents.size == 0 && !isShuffleMap)
true
else
numAvailableOutputs == numPartitions
}
def addOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
outputLocs(partition) = host :: prevList
if (prevList == Nil)
numAvailableOutputs += 1
}
def removeOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
val newList = prevList - host
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil)
numAvailableOutputs -= 1
}
override def toString = "Stage " + id
override def hashCode(): Int = id
} }
\ No newline at end of file
...@@ -12,6 +12,7 @@ import SparkContext._ ...@@ -12,6 +12,7 @@ import SparkContext._
import mesos._ import mesos._
@serializable
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
abstract class NarrowDependency[T](rdd: RDD[T]) abstract class NarrowDependency[T](rdd: RDD[T])
...@@ -19,11 +20,16 @@ extends Dependency(rdd, false) { ...@@ -19,11 +20,16 @@ extends Dependency(rdd, false) {
def getParents(outputPartition: Int): Seq[Int] def getParents(outputPartition: Int): Seq[Int]
} }
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}
class ShuffleDependency[K, V, C]( class ShuffleDependency[K, V, C](
rdd: RDD[(K, V)], rdd: RDD[(K, V)],
val spec: ShuffleSpec[K, V, C] val spec: ShuffleSpec[K, V, C]
) extends Dependency(rdd, true) ) extends Dependency(rdd, true)
@serializable
class ShuffleSpec[K, V, C] ( class ShuffleSpec[K, V, C] (
val createCombiner: V => C, val createCombiner: V => C,
val mergeValue: (C, V) => C, val mergeValue: (C, V) => C,
...@@ -31,6 +37,7 @@ class ShuffleSpec[K, V, C] ( ...@@ -31,6 +37,7 @@ class ShuffleSpec[K, V, C] (
val partitioner: Partitioner[K] val partitioner: Partitioner[K]
) )
@serializable
abstract class Partitioner[K] { abstract class Partitioner[K] {
def numPartitions: Int def numPartitions: Int
def getPartition(key: K): Int def getPartition(key: K): Int
...@@ -42,8 +49,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { ...@@ -42,8 +49,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def iterator(split: Split): Iterator[T] def iterator(split: Split): Iterator[T]
def preferredLocations(split: Split): Seq[String] def preferredLocations(split: Split): Seq[String]
def dependencies: List[Dependency[_]] = Nil val dependencies: List[Dependency[_]] = Nil
def partitioner: Option[Partitioner[_]] = None val partitioner: Option[Partitioner[_]] = None
def taskStarted(split: Split, slot: SlaveOffer) {} def taskStarted(split: Split, slot: SlaveOffer) {}
...@@ -66,7 +73,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { ...@@ -66,7 +73,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
} }
def collect(): Array[T] = { def collect(): Array[T] = {
val results = sc.scheduler.runJob(this, (iter: Iterator[T]) => iter.toArray) val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*) Array.concat(results: _*)
} }
...@@ -80,7 +87,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { ...@@ -80,7 +87,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
else else
None None
} }
val options = sc.scheduler.runJob(this, reducePartition) val options = sc.runJob(this, reducePartition)
val results = new ArrayBuffer[T] val results = new ArrayBuffer[T]
for (opt <- options; elem <- opt) for (opt <- options; elem <- opt)
results += elem results += elem
...@@ -177,6 +184,7 @@ extends RDD[U](prev.sparkContext) { ...@@ -177,6 +184,7 @@ extends RDD[U](prev.sparkContext) {
override def preferredLocations(split: Split) = prev.preferredLocations(split) override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).map(f) override def iterator(split: Split) = prev.iterator(split).map(f)
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot) override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
override val dependencies = List(new OneToOneDependency(prev))
} }
class FilteredRDD[T: ClassManifest]( class FilteredRDD[T: ClassManifest](
......
...@@ -14,7 +14,7 @@ class SparkContext( ...@@ -14,7 +14,7 @@ class SparkContext(
val sparkHome: String = null, val sparkHome: String = null,
val jars: Seq[String] = Nil) val jars: Seq[String] = Nil)
extends Logging { extends Logging {
private[spark] var scheduler: Scheduler = { private var scheduler: Scheduler = {
// Regular expression used for local[N] master format // Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
master match { master match {
...@@ -139,6 +139,15 @@ extends Logging { ...@@ -139,6 +139,15 @@ extends Logging {
*/ */
} }
private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
: Array[U] = {
logInfo("Starting job...")
val start = System.nanoTime
val result = scheduler.runJob(rdd, func)
logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
result
}
// Clean a closure to make it ready to serialized and send to tasks // Clean a closure to make it ready to serialized and send to tasks
// (removes unreferenced variables in $outer's, updates REPL variables) // (removes unreferenced variables in $outer's, updates REPL variables)
private[spark] def clean[F <: AnyRef](f: F): F = { private[spark] def clean[F <: AnyRef](f: F): F = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment