diff --git a/assembly/pom.xml b/assembly/pom.xml index 1382539f2459c6c9224770f5e989e5d2af8ae765..cc5a4875af868aad4823ad5797a9f9bf0ca52063 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -62,43 +62,31 @@ <groupId>org.spark-project</groupId> <artifactId>spark-core</artifactId> <classifier>${classifier.name}</classifier> - <version>0.8.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-bagel</artifactId> <classifier>${classifier.name}</classifier> - <version>0.8.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> + <artifactId>spark-mllib</artifactId> <classifier>${classifier.name}</classifier> - <version>0.8.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <classifier>javadoc</classifier> - <version>0.8.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <classifier>sources</classifier> - <version>0.8.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-repl</artifactId> <classifier>${classifier.name}</classifier> - <version>0.8.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.spark-project</groupId> <artifactId>spark-streaming</artifactId> <classifier>${classifier.name}</classifier> - <version>0.8.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> </dependencies> </project> \ No newline at end of file diff --git a/assembly/src/main/assembly/assembly.xml b/assembly/src/main/assembly/assembly.xml index dd05f35f1fb10e4e000dc434002834c3e016d113..14485b7181e90ad901548c0e4ff96c41b6c2a6ba 100644 --- a/assembly/src/main/assembly/assembly.xml +++ b/assembly/src/main/assembly/assembly.xml @@ -49,7 +49,7 @@ <include>org.spark-project:*:jar</include> </includes> <excludes> - <exclude>org.spark-project:spark-dist:jar</exclude> + <exclude>org.spark-project:spark-assembly:jar</exclude> </excludes> </dependencySet> <dependencySet> diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77cb0ee0cd6b06d1cc286023030e059381bb7594..0d1f9fa8d4f125ee6d03a9e8e0a821220ae9f5b3 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable import scala.collection.mutable.HashMap +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.util.DynamicVariable import scala.collection.mutable.{ConcurrentMap, HashMap} @@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} +import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, + SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, + ClusterScheduler, Schedulable, SchedulingMode} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource} @@ -574,6 +577,25 @@ class SparkContext( env.blockManager.master.getStorageStatus } + /** + * Return pools for fair scheduler + * TODO(xiajunluan):now, we have not taken nested pools into account + */ + def getPools: ArrayBuffer[Schedulable] = { + taskScheduler.rootPool.schedulableQueue + } + + /** + * Return current scheduling mode + */ + def getSchedulingMode: SchedulingMode.SchedulingMode = { + taskScheduler.schedulingMode + } + + def getPoolNameToPool: HashMap[String, Schedulable] = { + taskScheduler.rootPool.schedulableNameToSchedulable + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index 557df89b41fd52ca602d9e05ef3cedf23d8f17b3..79fdb210246eca77d46597479a169bec1e72b1d2 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -25,17 +25,25 @@ import akka.dispatch.Await import akka.pattern.ask import akka.util.duration._ +import net.liftweb.json.JsonAST.JValue + import spark.Utils import spark.deploy.DeployWebUI import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import spark.deploy.JsonProtocol import spark.deploy.master.{ApplicationInfo, WorkerInfo} import spark.ui.UIUtils - private[spark] class IndexPage(parent: MasterWebUI) { val master = parent.master implicit val timeout = parent.timeout + def renderJson(request: HttpServletRequest): JValue = { + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] + val state = Await.result(stateFuture, 30 seconds) + JsonProtocol.writeMasterState(state) + } + /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala index dabc2d8dc765378331c386da649a8fcbfde0d4c5..31bdb7854e4977259c32869f304a89d484b08efe 100644 --- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -61,6 +61,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)), + ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)), ("*", (request: HttpServletRequest) => indexPage.render(request)) ) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9b45fc29388eaba971624bd031b891abbeac2de5..89c51a44c98790c479dbf0260b563189770295bf 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -510,6 +510,12 @@ class DAGScheduler( tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } + // must be run listener before possible NotSerializableException + // should be "StageSubmitted" first and then "JobEnded" + val properties = idToActiveJob(stage.priority).properties + sparkListeners.foreach(_.onStageSubmitted( + SparkListenerStageSubmitted(stage, tasks.size, properties))) + if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception @@ -524,11 +530,9 @@ class DAGScheduler( return } - sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size))) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) - val properties = idToActiveJob(stage.priority).properties taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) if (!stage.submissionTime.isDefined) { diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index f7565b8c57e14c503da6fcca0af23afa868c88bb..c7e8f8a9a1d067647e17decd3006e14c0fbd07fb 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -62,7 +62,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { event match { case SparkListenerJobStart(job, properties) => processJobStartEvent(job, properties) - case SparkListenerStageSubmitted(stage, taskSize) => + case SparkListenerStageSubmitted(stage, taskSize, properties) => processStageSubmittedEvent(stage, taskSize) case StageCompleted(stageInfo) => processStageCompletedEvent(stageInfo) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 4eb7e4e6a5f8329b2e5df1466a93451a8dd66ee5..2a09a956ad05788b8fbfbdbd67c8b080151cc379 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -25,7 +25,8 @@ import spark.executor.TaskMetrics sealed trait SparkListenerEvents -case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents +case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) + extends SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents @@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents -case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) +case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) extends SparkListenerEvents -case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) +case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) extends SparkListenerEvents trait SparkListener { @@ -45,7 +46,7 @@ trait SparkListener { * Called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) { } - + /** * Called when a stage is submitted */ @@ -65,12 +66,12 @@ trait SparkListener { * Called when a job starts */ def onJobStart(jobStart: SparkListenerJobStart) { } - + /** * Called when a job ends */ def onJobEnd(jobEnd: SparkListenerJobEnd) { } - + } /** diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala index 51883080069f34eb7cf54c5a7feab1b973ef8356..4943d58e254034f43c2f4ce6b305455b5cf40e1a 100644 --- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,8 @@ package spark.scheduler +import spark.scheduler.cluster.Pool +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, @@ -25,6 +27,11 @@ package spark.scheduler * the TaskSchedulerListener interface. */ private[spark] trait TaskScheduler { + + def rootPool: Pool + + def schedulingMode: SchedulingMode + def start(): Unit // Invoked after system has successfully initialized (typically in spark context). diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 7c10074dc761fad68116bda60c287dcb7bed0358..d92dd3de8b5eeeb2f50908b12e3757c16113c57f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet import spark._ import spark.TaskState.TaskState import spark.scheduler._ +import spark.scheduler.cluster.SchedulingMode.SchedulingMode import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import java.util.{TimerTask, Timer} @@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null + // default scheduler is FIFO + val schedulingMode: SchedulingMode = SchedulingMode.withName( + System.getProperty("spark.cluster.schedulingmode", "FIFO")) override def setListener(listener: TaskSchedulerListener) { this.listener = listener @@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def initialize(context: SchedulerBackend) { backend = context - //default scheduler is FIFO - val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") - //temporarily set rootPool name to empty - rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0) + // temporarily set rootPool name to empty + rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { - case "FIFO" => + case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) - case "FAIR" => + case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool) } } @@ -270,10 +272,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } var launchedTask = false val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue() - for (manager <- sortedTaskSetQueue) - { - logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks)) + + for (manager <- sortedTaskSetQueue) { + logDebug("parentName:%s, name:%s, runningTasks:%s".format( + manager.parent.name, manager.name, manager.runningTasks)) } + for (manager <- sortedTaskSetQueue) { // Split offers based on node local, rack local and off-rack tasks. diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index bbf234febd7ddd9ee903c463daef86e678de786b..d2110bd098e59b3963ff0819235f15c1ab5d0d3e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -85,7 +85,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble // Maximum times a task is allowed to fail before failing the job - val MAX_TASK_FAILURES = 4 + val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble @@ -107,9 +107,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: var runningTasks = 0 var priority = taskSet.priority var stageId = taskSet.stageId - var name = "TaskSet_" + taskSet.stageId.toString + var name = "TaskSet_"+taskSet.stageId.toString var parent: Schedulable = null - // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -697,18 +696,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: } } - // TODO: for now we just find Pool not TaskSetManager, + // TODO(xiajunluan): for now we just find Pool not TaskSetManager // we can extend this function in future if needed override def getSchedulableByName(name: String): Schedulable = { return null } override def addSchedulable(schedulable:Schedulable) { - //nothing + // nothing } override def removeSchedulable(schedulable:Schedulable) { - //nothing + // nothing } override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala index f557b142c4eb7861b64626bc3e89b194d245e587..e77e8e4162e1a435d33a6a84117920b71d60ba74 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala @@ -17,14 +17,18 @@ package spark.scheduler.cluster -import scala.collection.mutable.ArrayBuffer +import spark.scheduler.cluster.SchedulingMode.SchedulingMode +import scala.collection.mutable.ArrayBuffer /** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { var parent: Schedulable + // child queues + def schedulableQueue: ArrayBuffer[Schedulable] + def schedulingMode: SchedulingMode def weight: Int def minShare: Int def runningTasks: Int diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala index 95554023c02c0efcbb6b7fe063a7bc13075372df..b2d089f31d9d01c0399006cb8b9e54ceb536003d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala @@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder { def addTaskSetManager(manager: Schedulable, properties: Properties) } -private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { +private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { override def buildPools() { - //nothing + // nothing } override def addTaskSetManager(manager: Schedulable, properties: Properties) { @@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula } } -private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { +private[spark] class FairSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" @@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula } } - //finally create "default" pool + // finally create "default" pool if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { - val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) @@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) if (parentPool == null) { - //we will create a new pool that user has configured in app instead of being defined in xml file - parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + // we will create a new pool that user has configured in app + // instead of being defined in xml file + parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala index 4b3e3e50e13babcccd854c5aadcd0783b75db34d..55cdf4791f985725d7bf05f431da100ba281bb60 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala @@ -17,8 +17,13 @@ package spark.scheduler.cluster -object SchedulingMode extends Enumeration("FAIR","FIFO"){ +/** + * "FAIR" and "FIFO" determines which policy is used + * to order tasks amongst a Schedulable's sub-queues + * "NONE" is used when the a Schedulable has no sub-queues. + */ +object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") { type SchedulingMode = Value - val FAIR,FIFO = Value + val FAIR,FIFO,NONE = Value } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 7978a5df7464d316f195b3423983130af1c192a7..1a92a5ed6fa06bb1a2dcf212db1c7de99f7e402d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -23,7 +23,10 @@ import spark.TaskState.TaskState import spark.scheduler.TaskSet private[spark] trait TaskSetManager extends Schedulable { - + def schedulableQueue = null + + def schedulingMode = SchedulingMode.NONE + def taskSet: TaskSet def slaveOffer( diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index edd83d4cb49639c236d0df50ee33fe66fc6b4d4d..bb0c836e86b2a39c6c0f39082806b77ce3265971 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -29,6 +29,7 @@ import spark.TaskState.TaskState import spark.executor.ExecutorURLClassLoader import spark.scheduler._ import spark.scheduler.cluster._ +import spark.scheduler.cluster.SchedulingMode.SchedulingMode import akka.actor._ /** @@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null + val schedulingMode: SchedulingMode = SchedulingMode.withName( + System.getProperty("spark.cluster.schedulingmode", "FIFO")) val activeTaskSets = new HashMap[String, TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var localActor: ActorRef = null override def start() { - //default scheduler is FIFO - val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") - //temporarily set rootPool name to empty - rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0) + // temporarily set rootPool name to empty + rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { - case "FIFO" => + case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) - case "FAIR" => + case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool) } } diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index b29740c886e572b7f82413dce53097fe710ec1d4..4ab15532cf8ac03ed2c5dc386fc5ad39498e0181 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -63,11 +63,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } override def addSchedulable(schedulable: Schedulable): Unit = { - //nothing + // nothing } override def removeSchedulable(schedulable: Schedulable): Unit = { - //nothing + // nothing } override def getSchedulableByName(name: String): Schedulable = { @@ -75,7 +75,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } override def executorLost(executorId: String, host: String): Unit = { - //nothing + // nothing } override def checkSpeculatableTasks() = true diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala index 18a3bf534c9d81709e2ae5414505f10e0e71a176..cff26d3168b334c5535768838a522f29e92d9f9b 100644 --- a/core/src/main/scala/spark/ui/UIUtils.scala +++ b/core/src/main/scala/spark/ui/UIUtils.scala @@ -116,9 +116,9 @@ private[spark] object UIUtils { <img src="/static/spark_logo.png" /> </div> <div class="span10"> - <h2 style="vertical-align: bottom; margin-top: 40px; display: inline-block;"> + <h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;"> {title} - </h2> + </h3> </div> </div> {content} diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index a80e2d7002689873849b7e510f3bda9fd5ac7d24..4fbb503e5ce7573a03644d9c88bbb43021ab398f 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -21,7 +21,8 @@ import scala.util.Random import spark.SparkContext import spark.SparkContext._ - +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * Continuously generates jobs that expose various features of the WebUI (internal testing tool). * @@ -31,16 +32,31 @@ private[spark] object UIWorkloadGenerator { val NUM_PARTITIONS = 100 val INTER_JOB_WAIT_MS = 500 + + def main(args: Array[String]) { + if (args.length < 2) { + println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]") + System.exit(1) + } val master = args(0) + val schedulingMode = SchedulingMode.withName(args(1)) val appName = "Spark UI Tester" + + if (schedulingMode == SchedulingMode.FAIR) { + System.setProperty("spark.cluster.schedulingmode", "FAIR") + } val sc = new SparkContext(master, appName) // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, // but we pass it here anyways since it will be useful once we do. - def setName(s: String) = { + def setProperties(s: String) = { + if(schedulingMode == SchedulingMode.FAIR) { + sc.addLocalProperties("spark.scheduler.cluster.fair.pool", s) + } sc.addLocalProperties("spark.job.annotation", s) } + val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) def nextFloat() = (new Random()).nextFloat() @@ -73,14 +89,18 @@ private[spark] object UIWorkloadGenerator { while (true) { for ((desc, job) <- jobs) { - try { - setName(desc) - job() - println("Job funished: " + desc) - } catch { - case e: Exception => - println("Job Failed: " + desc) - } + new Thread { + override def run() { + try { + setProperties(desc) + job() + println("Job funished: " + desc) + } catch { + case e: Exception => + println("Job Failed: " + desc) + } + } + }.start Thread.sleep(INTER_JOB_WAIT_MS) } } diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index a843b5ea2f88d6f8b8b62a42399986e1171fb6a2..4ad787565df86917c4516c0f2ab3bb887c22ac85 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -21,7 +21,6 @@ import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.HashSet import scala.Some import scala.xml.{NodeSeq, Node} @@ -32,10 +31,9 @@ import spark.ui.Page._ import spark.ui.UIUtils._ import spark.Utils -/** Page showing list of all ongoing and recently finished stages */ +/** Page showing list of all ongoing and recently finished stages and pools*/ private[spark] class IndexPage(parent: JobProgressUI) { def listener = parent.listener - val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { val activeStages = listener.activeStages.toSeq @@ -48,25 +46,11 @@ private[spark] class IndexPage(parent: JobProgressUI) { activeTime += t.timeRunning(now) } - /** Special table which merges two header cells. */ - def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <th>Stage Id</th> - <th>Origin</th> - <th>Submitted</th> - <th>Duration</th> - <th colspan="2">Tasks: Complete/Total</th> - <th>Shuffle Read</th> - <th>Shuffle Write</th> - <th>Stored RDD</th> - </thead> - <tbody> - {rows.map(r => makeRow(r))} - </tbody> - </table> - } + val activeStagesTable = new StageTable(activeStages, parent) + val completedStagesTable = new StageTable(completedStages, parent) + val failedStagesTable = new StageTable(failedStages, parent) + val poolTable = new PoolTable(parent.stagePagePoolSource, listener) val summary: NodeSeq = <div> <ul class="unstyled"> @@ -86,78 +70,23 @@ private[spark] class IndexPage(parent: JobProgressUI) { {Utils.memoryBytesToString(listener.totalShuffleWrite)} </li> } + <li><strong>Active Stages Number:</strong> {activeStages.size} </li> + <li><strong>Completed Stages Number:</strong> {completedStages.size} </li> + <li><strong>Failed Stages Number:</strong> {failedStages.size} </li> + <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> + </ul> </div> - val activeStageTable: NodeSeq = stageTable(stageRow, activeStages) - val completedStageTable = stageTable(stageRow, completedStages) - val failedStageTable: NodeSeq = stageTable(stageRow, failedStages) - val content = summary ++ - <h2>Active Stages</h2> ++ activeStageTable ++ - <h2>Completed Stages</h2> ++ completedStageTable ++ - <h2>Failed Stages</h2> ++ failedStageTable + val content = summary ++ + <h3>Pools </h3> ++ poolTable.toNodeSeq ++ + <h3>Active Stages : {activeStages.size}</h3> ++ + activeStagesTable.toNodeSeq++ + <h3>Completed Stages : {completedStages.size}</h3> ++ + completedStagesTable.toNodeSeq++ + <h3>Failed Stages : {failedStages.size}</h3> ++ + failedStagesTable.toNodeSeq headerSparkPage(content, parent.sc, "Spark Stages", Jobs) } - - def getElapsedTime(submitted: Option[Long], completed: Long): String = { - submitted match { - case Some(t) => parent.formatDuration(completed - t) - case _ => "Unknown" - } - } - - def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = { - val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) - val startWidth = "width: %s%%".format((started.toDouble/total)*100) - - <div class="progress" style="height: 15px; margin-bottom: 0px"> - <div class="bar" style={completeWidth}></div> - <div class="bar bar-info" style={startWidth}></div> - </div> - } - - - def stageRow(s: Stage): Seq[Node] = { - val submissionTime = s.submissionTime match { - case Some(t) => dateFmt.format(new Date(t)) - case None => "Unknown" - } - - val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match { - case 0 => "" - case b => Utils.memoryBytesToString(b) - } - val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match { - case 0 => "" - case b => Utils.memoryBytesToString(b) - } - - val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size - val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) - val totalTasks = s.numPartitions - - <tr> - <td>{s.id}</td> - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> - <td>{submissionTime}</td> - <td>{getElapsedTime(s.submissionTime, - s.completionTime.getOrElse(System.currentTimeMillis()))}</td> - <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td> - <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks} - {listener.stageToTasksFailed.getOrElse(s.id, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => - }} - </td> - <td>{shuffleRead}</td> - <td>{shuffleWrite}</td> - <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { - <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> - {Option(s.rdd.name).getOrElse(s.rdd.id)} - </a> - }} - </td> - </tr> - } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala new file mode 100644 index 0000000000000000000000000000000000000000..200e13cf99c1fcb977b70a7e3bc75d5ef01a8065 --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -0,0 +1,159 @@ +package spark.ui.jobs + +import scala.Seq +import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} + +import spark.{ExceptionFailure, SparkContext, Success, Utils} +import spark.scheduler._ +import spark.scheduler.cluster.TaskInfo +import spark.executor.TaskMetrics +import collection.mutable + +private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { + // How many stages to remember + val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt + val DEFAULT_POOL_NAME = "default" + + val stageToPool = new HashMap[Stage, String]() + val poolToActiveStages = new HashMap[String, HashSet[Stage]]() + + val activeStages = HashSet[Stage]() + val completedStages = ListBuffer[Stage]() + val failedStages = ListBuffer[Stage]() + + // Total metrics reflect metrics only for completed tasks + var totalTime = 0L + var totalShuffleRead = 0L + var totalShuffleWrite = 0L + + val stageToTime = HashMap[Int, Long]() + val stageToShuffleRead = HashMap[Int, Long]() + val stageToShuffleWrite = HashMap[Int, Long]() + val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]() + val stageToTasksComplete = HashMap[Int, Int]() + val stageToTasksFailed = HashMap[Int, Int]() + val stageToTaskInfos = + HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + + override def onJobStart(jobStart: SparkListenerJobStart) {} + + override def onStageCompleted(stageCompleted: StageCompleted) = { + val stage = stageCompleted.stageInfo.stage + poolToActiveStages(stageToPool(stage)) -= stage + activeStages -= stage + completedStages += stage + trimIfNecessary(completedStages) + } + + /** If stages is too large, remove and garbage collect old stages */ + def trimIfNecessary(stages: ListBuffer[Stage]) { + if (stages.size > RETAINED_STAGES) { + val toRemove = RETAINED_STAGES / 10 + stages.takeRight(toRemove).foreach( s => { + stageToTaskInfos.remove(s.id) + stageToTime.remove(s.id) + stageToShuffleRead.remove(s.id) + stageToShuffleWrite.remove(s.id) + stageToTasksActive.remove(s.id) + stageToTasksComplete.remove(s.id) + stageToTasksFailed.remove(s.id) + }) + stages.trimEnd(toRemove) + } + } + + /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { + val stage = stageSubmitted.stage + activeStages += stage + var poolName = DEFAULT_POOL_NAME + if (stageSubmitted.properties != null) { + poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", + DEFAULT_POOL_NAME) + } + stageToPool(stage) = poolName + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) + stages += stage + } + + override def onTaskStart(taskStart: SparkListenerTaskStart) { + val sid = taskStart.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive += taskStart.taskInfo + val taskList = stageToTaskInfos.getOrElse( + sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskStart.taskInfo, None, None)) + stageToTaskInfos(sid) = taskList + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val sid = taskEnd.task.stageId + val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) + tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) + case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) + } + + stageToTime.getOrElseUpdate(sid, 0L) + val time = metrics.map(m => m.executorRunTime).getOrElse(0) + stageToTime(sid) += time + totalTime += time + + stageToShuffleRead.getOrElseUpdate(sid, 0L) + val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => + s.remoteBytesRead).getOrElse(0L) + stageToShuffleRead(sid) += shuffleRead + totalShuffleRead += shuffleRead + + stageToShuffleWrite.getOrElseUpdate(sid, 0L) + val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => + s.shuffleBytesWritten).getOrElse(0L) + stageToShuffleWrite(sid) += shuffleWrite + totalShuffleWrite += shuffleWrite + + val taskList = stageToTaskInfos.getOrElse( + sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList -= ((taskEnd.taskInfo, None, None)) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + stageToTaskInfos(sid) = taskList + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + poolToActiveStages(stageToPool(stage)) -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } + case _ => + } + } + + /** Is this stage's input from a shuffle read. */ + def hasShuffleRead(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined + } + return false // No tasks have finished for this stage + } + + /** Is this stage's output to a shuffle write. */ + def hasShuffleWrite(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined + } + return false // No tasks have finished for this stage + } +} diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 09d24b6302047746d4bdf5a1339a91033d2d5718..3832c5d33c631089fc3929e4a8d661ed9b5efd43 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -31,9 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import spark.ui.JettyUtils._ import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.scheduler._ -import spark.scheduler.cluster.TaskInfo -import spark.executor.TaskMetrics import collection.mutable +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** Web UI showing progress status of all jobs in the given SparkContext. */ private[spark] class JobProgressUI(val sc: SparkContext) { @@ -41,11 +41,25 @@ private[spark] class JobProgressUI(val sc: SparkContext) { def listener = _listener.get val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) + private val poolPage = new PoolPage(this) + + var stagePoolInfo: StagePoolInfo = null + var stagePagePoolSource: PoolSource = null def start() { - _listener = Some(new JobProgressListener) + _listener = Some(new JobProgressListener(sc)) + sc.getSchedulingMode match { + case SchedulingMode.FIFO => + stagePoolInfo = new FIFOStagePoolInfo() + stagePagePoolSource = new FIFOSource() + case SchedulingMode.FAIR => + stagePoolInfo = new FairStagePoolInfo(listener) + stagePagePoolSource = new FairSource(sc) + } + sc.addSparkListener(listener) } @@ -53,120 +67,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) { def getHandlers = Seq[(String, Handler)]( ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), + ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)), ("/stages", (request: HttpServletRequest) => indexPage.render(request)) ) } - -private[spark] class JobProgressListener extends SparkListener { - // How many stages to remember - val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt - - val activeStages = HashSet[Stage]() - val completedStages = ListBuffer[Stage]() - val failedStages = ListBuffer[Stage]() - - // Total metrics reflect metrics only for completed tasks - var totalTime = 0L - var totalShuffleRead = 0L - var totalShuffleWrite = 0L - - val stageToTime = HashMap[Int, Long]() - val stageToShuffleRead = HashMap[Int, Long]() - val stageToShuffleWrite = HashMap[Int, Long]() - val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]() - val stageToTasksComplete = HashMap[Int, Int]() - val stageToTasksFailed = HashMap[Int, Int]() - val stageToTaskInfos = - HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() - - override def onJobStart(jobStart: SparkListenerJobStart) {} - - override def onStageCompleted(stageCompleted: StageCompleted) = { - val stage = stageCompleted.stageInfo.stage - activeStages -= stage - completedStages += stage - trimIfNecessary(completedStages) - } - - /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]) { - if (stages.size > RETAINED_STAGES) { - val toRemove = RETAINED_STAGES / 10 - stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.id) - stageToTime.remove(s.id) - stageToShuffleRead.remove(s.id) - stageToShuffleWrite.remove(s.id) - stageToTasksActive.remove(s.id) - stageToTasksComplete.remove(s.id) - stageToTasksFailed.remove(s.id) - }) - stages.trimEnd(toRemove) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = - activeStages += stageSubmitted.stage - - override def onTaskStart(taskStart: SparkListenerTaskStart) { - val sid = taskStart.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive += taskStart.taskInfo - val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - stageToTaskInfos(sid) = taskList - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val sid = taskEnd.task.stageId - val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive -= taskEnd.taskInfo - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = - taskEnd.reason match { - case e: ExceptionFailure => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case _ => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) - } - - stageToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(m => m.executorRunTime).getOrElse(0) - stageToTime(sid) += time - totalTime += time - - stageToShuffleRead.getOrElseUpdate(sid, 0L) - val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => - s.remoteBytesRead).getOrElse(0L) - stageToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead - - stageToShuffleWrite.getOrElseUpdate(sid, 0L) - val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => - s.shuffleBytesWritten).getOrElse(0L) - stageToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite - - val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageToTaskInfos(sid) = taskList - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd) { - jobEnd match { - case end: SparkListenerJobEnd => - end.jobResult match { - case JobFailed(ex, Some(stage)) => - activeStages -= stage - failedStages += stage - trimIfNecessary(failedStages) - case _ => - } - case _ => - } - } -} diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala new file mode 100644 index 0000000000000000000000000000000000000000..37d4f8fa6bea57e05e638bbf51df8ef937da62f1 --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -0,0 +1,31 @@ +package spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashSet + +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ + +/** Page showing specific pool details */ +private[spark] class PoolPage(parent: JobProgressUI) { + def listener = parent.listener + + def render(request: HttpServletRequest): Seq[Node] = { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq + + val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName) + val poolTable = new PoolTable(poolDetailPoolSource, listener) + + val activeStagesTable = new StageTable(activeStages, parent) + + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ + <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() + + headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + } +} diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala new file mode 100644 index 0000000000000000000000000000000000000000..8788ed8bc151092d16eaaaf366a51870fefac17c --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -0,0 +1,95 @@ +package spark.ui.jobs + +import java.util.Date + +import javax.servlet.http.HttpServletRequest + +import scala.Some +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +import spark.SparkContext +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.storage.StorageLevel +import spark.scheduler.cluster.Schedulable + +/* + * Interface for get pools seq showing on Index or pool detail page + */ + +private[spark] trait PoolSource { + def getPools: Seq[Schedulable] +} + +/* + * Pool source for FIFO scheduler algorithm on Index page + */ +private[spark] class FIFOSource() extends PoolSource { + def getPools: Seq[Schedulable] = { + Seq[Schedulable]() + } +} + +/* + * Pool source for Fair scheduler algorithm on Index page + */ +private[spark] class FairSource(sc: SparkContext) extends PoolSource { + def getPools: Seq[Schedulable] = { + sc.getPools.toSeq + } +} + +/* + * specific pool info for pool detail page + */ +private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource { + def getPools: Seq[Schedulable] = { + val pools = HashSet[Schedulable]() + pools += sc.getPoolNameToPool(poolName) + pools.toSeq + } +} + +/** Table showing list of pools */ +private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) { + + var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages + + def toNodeSeq(): Seq[Node] = { + poolTable(poolRow, poolSource.getPools) + } + + // pool tables + def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], + rows: Seq[Schedulable] + ): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Pool Name</th> + <th>Minimum Share</th> + <th>Pool Weight</th> + <td>Active Stages</td> + <td>Running Tasks</td> + <td>SchedulingMode</td> + </thead> + <tbody> + {rows.map(r => makeRow(r, poolToActiveStages))} + </tbody> + </table> + } + + def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + <tr> + <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td> + <td>{p.minShare}</td> + <td>{p.weight}</td> + <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td> + <td>{p.runningTasks}</td> + <td>{p.schedulingMode}</td> + </tr> + } +} + diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala new file mode 100644 index 0000000000000000000000000000000000000000..e18b70f0b93a2809e7e9f30ca0d371478a4eec7b --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -0,0 +1,149 @@ +package spark.ui.jobs + +import java.util.Date +import java.text.SimpleDateFormat + +import javax.servlet.http.HttpServletRequest + +import scala.Some +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +import spark.scheduler.cluster.TaskInfo +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.Utils +import spark.storage.StorageLevel + +/* + * Interface to get stage's pool name + */ +private[spark] trait StagePoolInfo { + def getStagePoolName(s: Stage): String + + def hasHref: Boolean +} + +/* + * For FIFO scheduler algorithm, just show "N/A" and its link status is false + */ +private[spark] class FIFOStagePoolInfo extends StagePoolInfo { + def getStagePoolName(s: Stage): String = "N/A" + + def hasHref: Boolean = false +} + +/* + * For Fair scheduler algorithm, show its pool name and pool detail link status is true + */ +private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo { + def getStagePoolName(s: Stage): String = { + listener.stageToPool(s) + } + + def hasHref: Boolean = true +} + +/** Page showing list of all ongoing and recently finished stages */ +private[spark] class StageTable( + val stages: Seq[Stage], + val parent: JobProgressUI) { + + val listener = parent.listener + val dateFmt = parent.dateFmt + var stagePoolInfo = parent.stagePoolInfo + + def toNodeSeq(): Seq[Node] = { + stageTable(stageRow, stages) + } + + /** Special table which merges two header cells. */ + def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Stage Id</th> + <th>Pool Name</th> + <th>Origin</th> + <th>Submitted</th> + <td>Duration</td> + <td colspan="2">Tasks: Complete/Total</td> + <td>Shuffle Read</td> + <td>Shuffle Write</td> + <td>Stored RDD</td> + </thead> + <tbody> + {rows.map(r => makeRow(r))} + </tbody> + </table> + } + + def getElapsedTime(submitted: Option[Long], completed: Long): String = { + submitted match { + case Some(t) => parent.formatDuration(completed - t) + case _ => "Unknown" + } + } + + def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = { + val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) + val startWidth = "width: %s%%".format((started.toDouble/total)*100) + + <div class="progress" style="height: 15px; margin-bottom: 0px"> + <div class="bar" style={completeWidth}></div> + <div class="bar bar-info" style={startWidth}></div> + </div> + } + + + def stageRow(s: Stage): Seq[Node] = { + val submissionTime = s.submissionTime match { + case Some(t) => dateFmt.format(new Date(t)) + case None => "Unknown" + } + + val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match { + case 0 => "" + case b => Utils.memoryBytesToString(b) + } + val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match { + case 0 => "" + case b => Utils.memoryBytesToString(b) + } + + val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size + val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) + val totalTasks = s.numPartitions + + val poolName = stagePoolInfo.getStagePoolName(s) + + <tr> + <td>{s.id}</td> + <td>{if (stagePoolInfo.hasHref) { + <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a> + } else { + {poolName} + }}</td> + <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> + <td>{submissionTime}</td> + <td>{getElapsedTime(s.submissionTime, + s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td> + <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks} + {listener.stageToTasksFailed.getOrElse(s.id, 0) match { + case f if f > 0 => "(%s failed)".format(f) + case _ => + }} + </td> + <td>{shuffleRead}</td> + <td>{shuffleWrite}</td> + <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { + <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> + {Option(s.rdd.name).getOrElse(s.rdd.id)} + </a> + }} + </td> + </tr> + } +} diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index a8b88d7936e4f5413d540c5c56c0fa094286aaff..caaf3209fd660f7f88b2b1eace053732d4e8bb51 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -32,6 +32,10 @@ import spark.{Dependency, ShuffleDependency, OneToOneDependency} import spark.{FetchFailed, Success, TaskEndReason} import spark.storage.{BlockManagerId, BlockManagerMaster} +import spark.scheduler.cluster.Pool +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode + /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler * rather than spawning an event loop thread as happens in the real code. They use EasyMock @@ -49,6 +53,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() val taskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} override def submitTasks(taskSet: TaskSet) = { diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala index 0f855c38da4f49f7abc3e5e119cc9f1bdf09afd3..bb9e715f95e217bd2c2b7c5f7a3bcbe90c782ae9 100644 --- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -57,7 +57,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None) val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None) - joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4)) + joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null)) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) parentRdd.setName("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD") diff --git a/docs/configuration.md b/docs/configuration.md index 0bcd73ca99db717e3ec1d09e9b4d6e011f557354..99624a44aa963f1e17260a679a958b44229dc37e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -310,6 +310,14 @@ Apart from these, the following properties are also available, and may be useful Duration (milliseconds) of how long to batch new objects coming from network receivers. </td> </tr> +<tr> + <td>spark.task.maxFailures</td> + <td>4</td> + <td> + Number of individual task failures before giving up on the job. + Should be greater than or equal to 1. Number of allowed retries = this value - 1. + </td> +</tr> </table> diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala index d875d6de50a2018140e328c7b5cc0ed9b7db7f9d..b402c71ed212028b32e120c257e04bfcd29fd6e9 100644 --- a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala @@ -315,14 +315,15 @@ object KMeans { } def main(args: Array[String]) { - if (args.length != 4) { - println("Usage: KMeans <master> <input_file> <k> <max_iterations>") + if (args.length < 4) { + println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]") System.exit(1) } val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) + val runs = if (args.length >= 5) args(4).toInt else 1 val sc = new SparkContext(master, "KMeans") - val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)) - val model = KMeans.train(data, k, iters) + val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() + val model = KMeans.train(data, k, iters, runs) val cost = model.computeCost(data) println("Cluster centers:") for (c <- model.clusterCenters) { diff --git a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala new file mode 100644 index 0000000000000000000000000000000000000000..8f95cf747985c4a48511870dfc48c12e85489d31 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.util + +import scala.util.Random + +import spark.{RDD, SparkContext} + +object KMeansDataGenerator { + + /** + * Generate an RDD containing test data for KMeans. This function chooses k cluster centers + * from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian + * cluster with scale 1 around each center. + * + * @param sc SparkContext to use for creating the RDD + * @param numPoints Number of points that will be contained in the RDD + * @param k Number of clusters + * @param d Number of dimensions + * @parak r Scaling factor for the distribution of the initial centers + * @param numPartitions Number of partitions of the generated RDD; default 2 + */ + def generateKMeansRDD( + sc: SparkContext, + numPoints: Int, + k: Int, + d: Int, + r: Double, + numPartitions: Int = 2) + : RDD[Array[Double]] = + { + // First, generate some centers + val rand = new Random(42) + val centers = Array.fill(k)(Array.fill(d)(rand.nextGaussian() * r)) + // Then generate points around each center + sc.parallelize(0 until numPoints, numPartitions).map { idx => + val center = centers(idx % k) + val rand2 = new Random(42 + idx) + Array.tabulate(d)(i => center(i) + rand2.nextGaussian()) + } + } + + def main(args: Array[String]) { + if (args.length < 6) { + println("Usage: KMeansGenerator " + + "<master> <output_dir> <num_points> <k> <d> <r> [<num_partitions>]") + System.exit(1) + } + + val sparkMaster = args(0) + val outputPath = args(1) + val numPoints = args(2).toInt + val k = args(3).toInt + val d = args(4).toInt + val r = args(5).toDouble + val parts = if (args.length >= 7) args(6).toInt else 2 + + val sc = new SparkContext(sparkMaster, "KMeansDataGenerator") + val data = generateKMeansRDD(sc, numPoints, k, d, r, parts) + data.map(_.mkString(" ")).saveAsTextFile(outputPath) + + System.exit(0) + } +} + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bb96ad4ae33ed0624dfd4cc593407f01c80d797f..c822f49e786c700ed92fd820eed3411017eb90f9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -178,7 +178,7 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "net.liftweb" % "lift-json_2.9.2" % "2.5", - "org.apache.mesos" % "mesos" % "0.12.0-incubating", + "org.apache.mesos" % "mesos" % "0.9.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", "com.codahale.metrics" % "metrics-core" % "3.0.0",