Skip to content
Snippets Groups Projects
Commit 999eaac7 authored by Karen Feng's avatar Karen Feng
Browse files

Merge branch 'master' of https://github.com/mesos/spark

parents ef1f22bd e466a55a
No related branches found
No related tags found
No related merge requests found
Showing
with 168 additions and 167 deletions
...@@ -62,43 +62,31 @@ ...@@ -62,43 +62,31 @@
<groupId>org.spark-project</groupId> <groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId> <artifactId>spark-core</artifactId>
<classifier>${classifier.name}</classifier> <classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.spark-project</groupId> <groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId> <artifactId>spark-bagel</artifactId>
<classifier>${classifier.name}</classifier> <classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.spark-project</groupId> <groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId> <artifactId>spark-mllib</artifactId>
<classifier>${classifier.name}</classifier> <classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version> <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<classifier>javadoc</classifier>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<classifier>sources</classifier>
<version>0.8.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.spark-project</groupId> <groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId> <artifactId>spark-repl</artifactId>
<classifier>${classifier.name}</classifier> <classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.spark-project</groupId> <groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId> <artifactId>spark-streaming</artifactId>
<classifier>${classifier.name}</classifier> <classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version> <version>${project.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
<include>org.spark-project:*:jar</include> <include>org.spark-project:*:jar</include>
</includes> </includes>
<excludes> <excludes>
<exclude>org.spark-project:spark-dist:jar</exclude> <exclude>org.spark-project:spark-assembly:jar</exclude>
</excludes> </excludes>
</dependencySet> </dependencySet>
<dependencySet> <dependencySet>
......
...@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._ ...@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map import scala.collection.Map
import scala.collection.generic.Growable import scala.collection.generic.Growable
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.util.DynamicVariable import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap} import scala.collection.mutable.{ConcurrentMap, HashMap}
...@@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary ...@@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult} import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource} import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
...@@ -574,6 +577,25 @@ class SparkContext( ...@@ -574,6 +577,25 @@ class SparkContext(
env.blockManager.master.getStorageStatus env.blockManager.master.getStorageStatus
} }
/**
* Return pools for fair scheduler
* TODO(xiajunluan):now, we have not taken nested pools into account
*/
def getPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
def getPoolNameToPool: HashMap[String, Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable
}
/** /**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to * Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes. * any new nodes.
......
...@@ -25,17 +25,25 @@ import akka.dispatch.Await ...@@ -25,17 +25,25 @@ import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._ import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
import spark.Utils import spark.Utils
import spark.deploy.DeployWebUI import spark.deploy.DeployWebUI
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.JsonProtocol
import spark.deploy.master.{ApplicationInfo, WorkerInfo} import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) { private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master val master = parent.master
implicit val timeout = parent.timeout implicit val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */ /** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
......
...@@ -61,6 +61,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { ...@@ -61,6 +61,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request)) ("*", (request: HttpServletRequest) => indexPage.render(request))
) )
......
...@@ -510,6 +510,12 @@ class DAGScheduler( ...@@ -510,6 +510,12 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
} }
} }
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
sparkListeners.foreach(_.onStageSubmitted(
SparkListenerStageSubmitted(stage, tasks.size, properties)))
if (tasks.size > 0) { if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this // Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception // exception here because it would be fairly hard to catch the non-serializable exception
...@@ -524,11 +530,9 @@ class DAGScheduler( ...@@ -524,11 +530,9 @@ class DAGScheduler(
return return
} }
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks myPending ++= tasks
logDebug("New pending tasks: " + myPending) logDebug("New pending tasks: " + myPending)
val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks( taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) { if (!stage.submissionTime.isDefined) {
......
...@@ -62,7 +62,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -62,7 +62,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
event match { event match {
case SparkListenerJobStart(job, properties) => case SparkListenerJobStart(job, properties) =>
processJobStartEvent(job, properties) processJobStartEvent(job, properties)
case SparkListenerStageSubmitted(stage, taskSize) => case SparkListenerStageSubmitted(stage, taskSize, properties) =>
processStageSubmittedEvent(stage, taskSize) processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) => case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo) processStageCompletedEvent(stageInfo)
......
...@@ -25,7 +25,8 @@ import spark.executor.TaskMetrics ...@@ -25,7 +25,8 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
...@@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa ...@@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents taskMetrics: TaskMetrics) extends SparkListenerEvents
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents extends SparkListenerEvents
trait SparkListener { trait SparkListener {
...@@ -45,7 +46,7 @@ trait SparkListener { ...@@ -45,7 +46,7 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage * Called when a stage is completed, with information on the completed stage
*/ */
def onStageCompleted(stageCompleted: StageCompleted) { } def onStageCompleted(stageCompleted: StageCompleted) { }
/** /**
* Called when a stage is submitted * Called when a stage is submitted
*/ */
...@@ -65,12 +66,12 @@ trait SparkListener { ...@@ -65,12 +66,12 @@ trait SparkListener {
* Called when a job starts * Called when a job starts
*/ */
def onJobStart(jobStart: SparkListenerJobStart) { } def onJobStart(jobStart: SparkListenerJobStart) { }
/** /**
* Called when a job ends * Called when a job ends
*/ */
def onJobEnd(jobEnd: SparkListenerJobEnd) { } def onJobEnd(jobEnd: SparkListenerJobEnd) { }
} }
/** /**
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package spark.scheduler package spark.scheduler
import spark.scheduler.cluster.Pool
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** /**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
...@@ -25,6 +27,11 @@ package spark.scheduler ...@@ -25,6 +27,11 @@ package spark.scheduler
* the TaskSchedulerListener interface. * the TaskSchedulerListener interface.
*/ */
private[spark] trait TaskScheduler { private[spark] trait TaskScheduler {
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit def start(): Unit
// Invoked after system has successfully initialized (typically in spark context). // Invoked after system has successfully initialized (typically in spark context).
......
...@@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet ...@@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
import spark._ import spark._
import spark.TaskState.TaskState import spark.TaskState.TaskState
import spark.scheduler._ import spark.scheduler._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer} import java.util.{TimerTask, Timer}
...@@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) { override def setListener(listener: TaskSchedulerListener) {
this.listener = listener this.listener = listener
...@@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) { def initialize(context: SchedulerBackend) {
backend = context backend = context
//default scheduler is FIFO // temporarily set rootPool name to empty
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") rootPool = new Pool("", schedulingMode, 0, 0)
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
schedulableBuilder = { schedulableBuilder = {
schedulingMode match { schedulingMode match {
case "FIFO" => case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool) new FIFOSchedulableBuilder(rootPool)
case "FAIR" => case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool) new FairSchedulableBuilder(rootPool)
} }
} }
...@@ -270,10 +272,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -270,10 +272,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
var launchedTask = false var launchedTask = false
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue() val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue)
{ for (manager <- sortedTaskSetQueue) {
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks)) logDebug("parentName:%s, name:%s, runningTasks:%s".format(
manager.parent.name, manager.name, manager.runningTasks))
} }
for (manager <- sortedTaskSetQueue) { for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks. // Split offers based on node local, rack local and off-rack tasks.
......
...@@ -85,7 +85,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: ...@@ -85,7 +85,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job // Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4 val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation // Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
...@@ -107,9 +107,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: ...@@ -107,9 +107,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
var runningTasks = 0 var runningTasks = 0
var priority = taskSet.priority var priority = taskSet.priority
var stageId = taskSet.stageId var stageId = taskSet.stageId
var name = "TaskSet_" + taskSet.stageId.toString var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null var parent: Schedulable = null
// Last time when we launched a preferred task (for delay scheduling) // Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis var lastPreferredLaunchTime = System.currentTimeMillis
...@@ -697,18 +696,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: ...@@ -697,18 +696,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
} }
} }
// TODO: for now we just find Pool not TaskSetManager, // TODO(xiajunluan): for now we just find Pool not TaskSetManager
// we can extend this function in future if needed // we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = { override def getSchedulableByName(name: String): Schedulable = {
return null return null
} }
override def addSchedulable(schedulable:Schedulable) { override def addSchedulable(schedulable:Schedulable) {
//nothing // nothing
} }
override def removeSchedulable(schedulable:Schedulable) { override def removeSchedulable(schedulable:Schedulable) {
//nothing // nothing
} }
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
......
...@@ -17,14 +17,18 @@ ...@@ -17,14 +17,18 @@
package spark.scheduler.cluster package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.collection.mutable.ArrayBuffer
/** /**
* An interface for schedulable entities. * An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers) * there are two type of Schedulable entities(Pools and TaskSetManagers)
*/ */
private[spark] trait Schedulable { private[spark] trait Schedulable {
var parent: Schedulable var parent: Schedulable
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int def weight: Int
def minShare: Int def minShare: Int
def runningTasks: Int def runningTasks: Int
......
...@@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder { ...@@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties) def addTaskSetManager(manager: Schedulable, properties: Properties)
} }
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() { override def buildPools() {
//nothing // nothing
} }
override def addTaskSetManager(manager: Schedulable, properties: Properties) { override def addTaskSetManager(manager: Schedulable, properties: Properties) {
...@@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula ...@@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
} }
} }
private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
...@@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula ...@@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
} }
} }
//finally create "default" pool // finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool) rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
...@@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula ...@@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName) parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) { if (parentPool == null) {
//we will create a new pool that user has configured in app instead of being defined in xml file // we will create a new pool that user has configured in app
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) // instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool) rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
......
...@@ -17,8 +17,13 @@ ...@@ -17,8 +17,13 @@
package spark.scheduler.cluster package spark.scheduler.cluster
object SchedulingMode extends Enumeration("FAIR","FIFO"){ /**
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
type SchedulingMode = Value type SchedulingMode = Value
val FAIR,FIFO = Value val FAIR,FIFO,NONE = Value
} }
...@@ -23,7 +23,10 @@ import spark.TaskState.TaskState ...@@ -23,7 +23,10 @@ import spark.TaskState.TaskState
import spark.scheduler.TaskSet import spark.scheduler.TaskSet
private[spark] trait TaskSetManager extends Schedulable { private[spark] trait TaskSetManager extends Schedulable {
def schedulableQueue = null
def schedulingMode = SchedulingMode.NONE
def taskSet: TaskSet def taskSet: TaskSet
def slaveOffer( def slaveOffer(
......
...@@ -29,6 +29,7 @@ import spark.TaskState.TaskState ...@@ -29,6 +29,7 @@ import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader import spark.executor.ExecutorURLClassLoader
import spark.scheduler._ import spark.scheduler._
import spark.scheduler.cluster._ import spark.scheduler.cluster._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._ import akka.actor._
/** /**
...@@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: ...@@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager] val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]] val taskSetTaskIds = new HashMap[String, HashSet[Long]]
...@@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: ...@@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null var localActor: ActorRef = null
override def start() { override def start() {
//default scheduler is FIFO // temporarily set rootPool name to empty
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") rootPool = new Pool("", schedulingMode, 0, 0)
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
schedulableBuilder = { schedulableBuilder = {
schedulingMode match { schedulingMode match {
case "FIFO" => case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool) new FIFOSchedulableBuilder(rootPool)
case "FAIR" => case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool) new FairSchedulableBuilder(rootPool)
} }
} }
......
...@@ -63,11 +63,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas ...@@ -63,11 +63,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
} }
override def addSchedulable(schedulable: Schedulable): Unit = { override def addSchedulable(schedulable: Schedulable): Unit = {
//nothing // nothing
} }
override def removeSchedulable(schedulable: Schedulable): Unit = { override def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing // nothing
} }
override def getSchedulableByName(name: String): Schedulable = { override def getSchedulableByName(name: String): Schedulable = {
...@@ -75,7 +75,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas ...@@ -75,7 +75,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
} }
override def executorLost(executorId: String, host: String): Unit = { override def executorLost(executorId: String, host: String): Unit = {
//nothing // nothing
} }
override def checkSpeculatableTasks() = true override def checkSpeculatableTasks() = true
......
...@@ -116,9 +116,9 @@ private[spark] object UIUtils { ...@@ -116,9 +116,9 @@ private[spark] object UIUtils {
<img src="/static/spark_logo.png" /> <img src="/static/spark_logo.png" />
</div> </div>
<div class="span10"> <div class="span10">
<h2 style="vertical-align: bottom; margin-top: 40px; display: inline-block;"> <h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
{title} {title}
</h2> </h3>
</div> </div>
</div> </div>
{content} {content}
......
...@@ -21,7 +21,8 @@ import scala.util.Random ...@@ -21,7 +21,8 @@ import scala.util.Random
import spark.SparkContext import spark.SparkContext
import spark.SparkContext._ import spark.SparkContext._
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** /**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool). * Continuously generates jobs that expose various features of the WebUI (internal testing tool).
* *
...@@ -31,16 +32,31 @@ private[spark] object UIWorkloadGenerator { ...@@ -31,16 +32,31 @@ private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100 val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 500 val INTER_JOB_WAIT_MS = 500
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 2) {
println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
val master = args(0) val master = args(0)
val schedulingMode = SchedulingMode.withName(args(1))
val appName = "Spark UI Tester" val appName = "Spark UI Tester"
if (schedulingMode == SchedulingMode.FAIR) {
System.setProperty("spark.cluster.schedulingmode", "FAIR")
}
val sc = new SparkContext(master, appName) val sc = new SparkContext(master, appName)
// NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
// but we pass it here anyways since it will be useful once we do. // but we pass it here anyways since it will be useful once we do.
def setName(s: String) = { def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.addLocalProperties("spark.scheduler.cluster.fair.pool", s)
}
sc.addLocalProperties("spark.job.annotation", s) sc.addLocalProperties("spark.job.annotation", s)
} }
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
def nextFloat() = (new Random()).nextFloat() def nextFloat() = (new Random()).nextFloat()
...@@ -73,14 +89,18 @@ private[spark] object UIWorkloadGenerator { ...@@ -73,14 +89,18 @@ private[spark] object UIWorkloadGenerator {
while (true) { while (true) {
for ((desc, job) <- jobs) { for ((desc, job) <- jobs) {
try { new Thread {
setName(desc) override def run() {
job() try {
println("Job funished: " + desc) setProperties(desc)
} catch { job()
case e: Exception => println("Job funished: " + desc)
println("Job Failed: " + desc) } catch {
} case e: Exception =>
println("Job Failed: " + desc)
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS) Thread.sleep(INTER_JOB_WAIT_MS)
} }
} }
......
...@@ -21,7 +21,6 @@ import java.util.Date ...@@ -21,7 +21,6 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.HashSet
import scala.Some import scala.Some
import scala.xml.{NodeSeq, Node} import scala.xml.{NodeSeq, Node}
...@@ -32,10 +31,9 @@ import spark.ui.Page._ ...@@ -32,10 +31,9 @@ import spark.ui.Page._
import spark.ui.UIUtils._ import spark.ui.UIUtils._
import spark.Utils import spark.Utils
/** Page showing list of all ongoing and recently finished stages */ /** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) { private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener def listener = parent.listener
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq val activeStages = listener.activeStages.toSeq
...@@ -48,25 +46,11 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -48,25 +46,11 @@ private[spark] class IndexPage(parent: JobProgressUI) {
activeTime += t.timeRunning(now) activeTime += t.timeRunning(now)
} }
/** Special table which merges two header cells. */ val activeStagesTable = new StageTable(activeStages, parent)
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { val completedStagesTable = new StageTable(completedStages, parent)
<table class="table table-bordered table-striped table-condensed sortable"> val failedStagesTable = new StageTable(failedStages, parent)
<thead>
<th>Stage Id</th>
<th>Origin</th>
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}
val poolTable = new PoolTable(parent.stagePagePoolSource, listener)
val summary: NodeSeq = val summary: NodeSeq =
<div> <div>
<ul class="unstyled"> <ul class="unstyled">
...@@ -86,78 +70,23 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -86,78 +70,23 @@ private[spark] class IndexPage(parent: JobProgressUI) {
{Utils.memoryBytesToString(listener.totalShuffleWrite)} {Utils.memoryBytesToString(listener.totalShuffleWrite)}
</li> </li>
} }
<li><strong>Active Stages Number:</strong> {activeStages.size} </li>
<li><strong>Completed Stages Number:</strong> {completedStages.size} </li>
<li><strong>Failed Stages Number:</strong> {failedStages.size} </li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
</ul> </ul>
</div> </div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
val content = summary ++ val content = summary ++
<h2>Active Stages</h2> ++ activeStageTable ++ <h3>Pools </h3> ++ poolTable.toNodeSeq ++
<h2>Completed Stages</h2> ++ completedStageTable ++ <h3>Active Stages : {activeStages.size}</h3> ++
<h2>Failed Stages</h2> ++ failedStageTable activeStagesTable.toNodeSeq++
<h3>Completed Stages : {completedStages.size}</h3> ++
completedStagesTable.toNodeSeq++
<h3>Failed Stages : {failedStages.size}</h3> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs) headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
} }
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress" style="height: 15px; margin-bottom: 0px">
<div class="bar" style={completeWidth}></div>
<div class="bar bar-info" style={startWidth}></div>
</div>
}
def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
<tr>
<td>{s.id}</td>
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
<td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
}}
</td>
<td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
</a>
}}
</td>
</tr>
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment