Skip to content
Snippets Groups Projects
Commit 8436bd5d authored by Andrew xia's avatar Andrew xia
Browse files

remove TaskSetQueueManager and update code style

parent e0603d7e
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,7 @@ import spark.partial.PartialResult
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler._
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler, TaskSetQueuesManager}
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
......@@ -75,11 +75,6 @@ class SparkContext(
System.setProperty("spark.driver.port", "0")
}
//Set the default task scheduler
if (System.getProperty("spark.cluster.taskscheduler") == null) {
System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager")
}
private val isLocal = (master == "local" || master.startsWith("local["))
// Create the Spark execution environment (cache, map output tracker, etc)
......@@ -599,8 +594,7 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler
,localProperties.value)
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
......
package spark.scheduler
import java.util.Properties
import spark.scheduler.cluster.TaskInfo
......
......@@ -86,14 +86,14 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
//finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
//finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
......
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
/**
* An interface for managing TaskSet queue/s that allows plugging different policy for
* offering tasks to resources
*/
private[spark] trait TaskSetQueuesManager {
def addTaskSetManager(manager: TaskSetManager): Unit
def removeTaskSetManager(manager: TaskSetManager): Unit
def taskFinished(manager: TaskSetManager): Unit
def removeExecutor(executorId: String, host: String): Unit
def receiveOffer(execId: String, host:String, avaiableCpus:Double):Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment