From 8436bd5d4a96480ac1871330a28d9d712e64959d Mon Sep 17 00:00:00 2001 From: Andrew xia <junluan.xia@intel.com> Date: Fri, 19 Apr 2013 02:17:22 +0800 Subject: [PATCH] remove TaskSetQueueManager and update code style --- core/src/main/scala/spark/SparkContext.scala | 10 ++-------- .../spark/scheduler/DAGSchedulerEvent.scala | 1 - .../scheduler/cluster/SchedulableBuilder.scala | 14 +++++++------- .../scheduler/cluster/TaskSetQueuesManager.scala | 16 ---------------- 4 files changed, 9 insertions(+), 32 deletions(-) delete mode 100644 core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5d9a0357ad..eef25ef588 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -39,7 +39,7 @@ import spark.partial.PartialResult import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} import spark.scheduler._ import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler, TaskSetQueuesManager} +import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerUI import spark.util.{MetadataCleaner, TimeStampedHashMap} @@ -75,11 +75,6 @@ class SparkContext( System.setProperty("spark.driver.port", "0") } - //Set the default task scheduler - if (System.getProperty("spark.cluster.taskscheduler") == null) { - System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager") - } - private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) @@ -599,8 +594,7 @@ class SparkContext( val callSite = Utils.getSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler - ,localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 11fec568c6..303c211e2a 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -1,6 +1,5 @@ package spark.scheduler - import java.util.Properties import spark.scheduler.cluster.TaskInfo diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala index 47a426a45b..18cc15c2a5 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala @@ -86,14 +86,14 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula } } - //finally create "default" pool - if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { - val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) - rootPool.addSchedulable(pool) - logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + //finally create "default" pool + if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(pool) + logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( + DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } } -} override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala deleted file mode 100644 index 86971d47e6..0000000000 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala +++ /dev/null @@ -1,16 +0,0 @@ -package spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -/** - * An interface for managing TaskSet queue/s that allows plugging different policy for - * offering tasks to resources - */ -private[spark] trait TaskSetQueuesManager { - def addTaskSetManager(manager: TaskSetManager): Unit - def removeTaskSetManager(manager: TaskSetManager): Unit - def taskFinished(manager: TaskSetManager): Unit - def removeExecutor(executorId: String, host: String): Unit - def receiveOffer(execId: String, host:String, avaiableCpus:Double):Option[TaskDescription] - def checkSpeculatableTasks(): Boolean -} -- GitLab