Skip to content
Snippets Groups Projects
Commit 834686b1 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #928 from jerryshao/fairscheduler-refactor

Refactor FairSchedulableBuilder
parents a2ea069a 77e9da1f
No related branches found
No related tags found
No related merge requests found
......@@ -17,14 +17,12 @@
package org.apache.spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.util.Properties
import scala.xml.XML
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.xml.XML
/**
* An interface to build Schedulable tree
......@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
......@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
var is: Option[InputStream] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
// finally create "default" pool
buildDefaultPool()
}
private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
......@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
private def buildFairSchedulerPool(is: InputStream) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: NoSuchElementException =>
logWarning("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment