diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 375636071db2b1c02419dd8f54bbee2ecf2cdbff..77471602901aae99dd163f71f471b832cc759007 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -44,9 +44,10 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob} -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable, -SchedulingMode} +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, + SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, + ClusterScheduler, Schedulable, SchedulingMode} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.{StorageStatus, StorageUtils, RDDInfo} diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index c865743e376d3ae515a8abc3b20b54a24bb7648a..5fda78e152c583d0c07059076acebd9f97440ac2 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -473,7 +473,8 @@ class DAGScheduler( } if (tasks.size > 0) { val properties = idToActiveJob(stage.priority).properties - sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size, properties))) + sparkListeners.foreach(_.onStageSubmitted( + SparkListenerStageSubmitted(stage, tasks.size, properties))) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 07372ee78676f6d8db64e7e1ae187a2f162eec8e..49f7c85c292ecac8e5b31d0665a67e675cfe655a 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -8,17 +8,18 @@ import spark.executor.TaskMetrics sealed trait SparkListenerEvents -case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) extends SparkListenerEvents +case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) + extends SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents -case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) +case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) extends SparkListenerEvents -case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) +case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) extends SparkListenerEvents trait SparkListener { @@ -26,12 +27,12 @@ trait SparkListener { * Called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) { } - + /** * Called when a stage is submitted */ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { } - + /** * Called when a task ends */ @@ -41,12 +42,12 @@ trait SparkListener { * Called when a job starts */ def onJobStart(jobStart: SparkListenerJobStart) { } - + /** * Called when a job ends */ def onJobEnd(jobEnd: SparkListenerJobEnd) { } - + } /** diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 74b3e43d2b1e99529519b12889a8996b4717104a..20680bbf87ecdaef7d25c1ffcac7f8f8d1bc2a46 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -9,10 +9,10 @@ import scala.collection.mutable.HashSet import spark._ import spark.TaskState.TaskState import spark.scheduler._ +import spark.scheduler.cluster.SchedulingMode.SchedulingMode import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import java.util.{TimerTask, Timer} -import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -257,7 +257,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue() for (manager <- sortedTaskSetQueue) { - logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks)) + logInfo("parentName:%s, name:%s, runningTasks:%s".format( + manager.parent.name, manager.name, manager.runningTasks)) } for (manager <- sortedTaskSetQueue) { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index 4d11b0959af44e9bc4c42a72e96d0e31cd690fa8..5e2351bafd8f8e6a9f6f77565d1330e4395b292e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -644,7 +644,8 @@ private[spark] class ClusterTaskSetManager( } } - // TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed + // TODO: for now we just find Pool not TaskSetManager + // we can extend this function in future if needed override def getSchedulableByName(name: String): Schedulable = { return null } diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala index a2fa80aa36f66d8cf128a7167453b7b28e010bad..db51b4849454527054a13c7735dd1211bd1d6873 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala @@ -24,7 +24,8 @@ private[spark] trait SchedulableBuilder { def addTaskSetManager(manager: Schedulable, properties: Properties) } -private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { +private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { override def buildPools() { // nothing @@ -35,7 +36,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula } } -private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { +private[spark] class FairSchedulableBuilder(val rootPool: Pool) + extends SchedulableBuilder with Logging { val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" @@ -88,7 +90,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula // finally create "default" pool if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { - val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) @@ -102,8 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) if (parentPool == null) { - // we will create a new pool that user has configured in app instead of being defined in xml file - parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + // we will create a new pool that user has configured in app + // instead of being defined in xml file + parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala index a7f0f6f3932817b79bdc76e2b6093ca4ea4dfb0a..cd0642772d8738239fa07bf7c94829714c7c691e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala @@ -1,7 +1,8 @@ package spark.scheduler.cluster /** - * "FAIR" and "FIFO" determines which policy is used to order tasks amongst a Schedulable's sub-queues + * "FAIR" and "FIFO" determines which policy is used + * to order tasks amongst a Schedulable's sub-queues * "NONE" is used when the a Schedulable has no sub-queues. */ object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") { diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index abef683791198d24366af99aee5c61e91bc99c7e..04651e9c605fdbe3b3fb573597891d9b32fe2aee 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -40,9 +40,12 @@ private[spark] class IndexPage(parent: JobProgressUI) { </div> </div> ++ <h3>Pools </h3> ++ poolTable.toNodeSeq ++ - <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq++ - <h3>Completed Stages : {completedStages.size}</h3> ++ completedStagesTable.toNodeSeq++ - <h3>Failed Stages : {failedStages.size}</h3> ++ failedStagesTable.toNodeSeq + <h3>Active Stages : {activeStages.size}</h3> ++ + activeStagesTable.toNodeSeq++ + <h3>Completed Stages : {completedStages.size}</h3> ++ + completedStagesTable.toNodeSeq++ + <h3>Failed Stages : {failedStages.size}</h3> ++ + failedStagesTable.toNodeSeq headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs) } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index d4767bea2241da5cd4c513dc08817ff57eb1945e..da767b3c0a095371291fce2f7f7c2506f7bbe7aa 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -53,7 +53,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList activeStages += stage var poolName = DEFAULT_POOL_NAME if (stageSubmitted.properties != null) { - poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", + DEFAULT_POOL_NAME) } stageToPool(stage) = poolName val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())