diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5d9a0357ad480ab2e2c1457921ee706d95a3636f..eef25ef588386bca5a994f43e6b2bdb9ef6601cb 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -39,7 +39,7 @@ import spark.partial.PartialResult import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} import spark.scheduler._ import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler, TaskSetQueuesManager} +import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerUI import spark.util.{MetadataCleaner, TimeStampedHashMap} @@ -75,11 +75,6 @@ class SparkContext( System.setProperty("spark.driver.port", "0") } - //Set the default task scheduler - if (System.getProperty("spark.cluster.taskscheduler") == null) { - System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager") - } - private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) @@ -599,8 +594,7 @@ class SparkContext( val callSite = Utils.getSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler - ,localProperties.value) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 11fec568c60778d4db12c4eb4a2dcb10d11c5efd..303c211e2a17260f49603fc7bce816bc4f78a09f 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -1,6 +1,5 @@ package spark.scheduler - import java.util.Properties import spark.scheduler.cluster.TaskInfo diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala index 47a426a45b8bc25bb3492b5ad8c0e2dbf36a0b16..18cc15c2a5ee9e42fa47617c3e79344a7a7f6348 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala @@ -86,14 +86,14 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula } } - //finally create "default" pool - if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { - val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) - rootPool.addSchedulable(pool) - logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + //finally create "default" pool + if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(pool) + logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( + DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } } -} override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala deleted file mode 100644 index 86971d47e6147b0988d05968cbf940f0b5a15fd4..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala +++ /dev/null @@ -1,16 +0,0 @@ -package spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -/** - * An interface for managing TaskSet queue/s that allows plugging different policy for - * offering tasks to resources - */ -private[spark] trait TaskSetQueuesManager { - def addTaskSetManager(manager: TaskSetManager): Unit - def removeTaskSetManager(manager: TaskSetManager): Unit - def taskFinished(manager: TaskSetManager): Unit - def removeExecutor(executorId: String, host: String): Unit - def receiveOffer(execId: String, host:String, avaiableCpus:Double):Option[TaskDescription] - def checkSpeculatableTasks(): Boolean -}