diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index f80823317bde7adac3af0b450ce7ba5c9af41080..114617c51a3c0ace5b00cd1a91ffa7d45cd865cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -17,14 +17,12 @@ package org.apache.spark.scheduler.cluster -import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException} -import java.util.Properties - -import scala.xml.XML +import java.io.{FileInputStream, InputStream} +import java.util.{NoSuchElementException, Properties} import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import scala.xml.XML /** * An interface to build Schedulable tree @@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file")) + val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" @@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val DEFAULT_WEIGHT = 1 override def buildPools() { - if (schedulerAllocFile != null) { - val file = new File(schedulerAllocFile) - if (file.exists()) { - val xml = XML.loadFile(file) - for (poolNode <- (xml \\ POOLS_PROPERTY)) { - - val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode") - } - } - - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } - - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } - - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) + var is: Option[InputStream] = None + try { + is = Option { + schedulerAllocFile.map { f => + new FileInputStream(f) + }.getOrElse { + getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } - } else { - throw new java.io.FileNotFoundException( - "Fair scheduler allocation file not found: " + schedulerAllocFile) } + + is.foreach { i => buildFairSchedulerPool(i) } + } finally { + is.foreach(_.close()) } // finally create "default" pool + buildDefaultPool() + } + + private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) @@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) } } + private def buildFairSchedulerPool(is: InputStream) { + val xml = XML.load(is) + for (poolNode <- (xml \\ POOLS_PROPERTY)) { + + val poolName = (poolNode \ POOL_NAME_PROPERTY).text + var schedulingMode = DEFAULT_SCHEDULING_MODE + var minShare = DEFAULT_MINIMUM_SHARE + var weight = DEFAULT_WEIGHT + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + if (xmlSchedulingMode != "") { + try { + schedulingMode = SchedulingMode.withName(xmlSchedulingMode) + } catch { + case e: NoSuchElementException => + logWarning("Error xml schedulingMode, using default schedulingMode") + } + } + + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + if (xmlMinShare != "") { + minShare = xmlMinShare.toInt + } + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + if (xmlWeight != "") { + weight = xmlWeight.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight) + rootPool.addSchedulable(pool) + logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + poolName, schedulingMode, minShare, weight)) + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName)