diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index f8bee3eea5ce25a8b06ea66852b5ecfc15132233..e53c4fb5b47784cfdc4809edc4f1fbc24b552642 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} +import scala.util.control.NonFatal import scala.xml.{Node, XML} import org.apache.spark.SparkConf @@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging { - val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file") + val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" @@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { - var is: Option[InputStream] = None + var fileData: Option[(InputStream, String)] = None try { - is = Option { - schedulerAllocFile.map { f => - new FileInputStream(f) - }.getOrElse { - Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + fileData = schedulerAllocFile.map { f => + val fis = new FileInputStream(f) + logInfo(s"Creating Fair Scheduler pools from $f") + Some((fis, f)) + }.getOrElse { + val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + if (is != null) { + logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE") + Some((is, DEFAULT_SCHEDULER_FILE)) + } else { + logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " + + s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " + + s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.") + None } } - is.foreach { i => buildFairSchedulerPool(i) } + fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } + } catch { + case NonFatal(t) => + val defaultMessage = "Error while building the fair scheduler pools" + val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" } + .getOrElse(defaultMessage) + logError(message, t) + throw t } finally { - is.foreach(_.close()) + fileData.foreach { case (is, fileName) => is.close() } } // finally create "default" pool @@ -93,24 +111,27 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool) - logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } - private def buildFairSchedulerPool(is: InputStream) { + private def buildFairSchedulerPool(is: InputStream, fileName: String) { val xml = XML.load(is) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE) - val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE) - val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT) + val schedulingMode = getSchedulingModeValue(poolNode, poolName, + DEFAULT_SCHEDULING_MODE, fileName) + val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, + DEFAULT_MINIMUM_SHARE, fileName) + val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, + DEFAULT_WEIGHT, fileName) rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } @@ -118,11 +139,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def getSchedulingModeValue( poolNode: Node, poolName: String, - defaultValue: SchedulingMode): SchedulingMode = { + defaultValue: SchedulingMode, + fileName: String): SchedulingMode = { val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase - val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + - s"schedulingMode: $defaultValue for pool: $poolName" + val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " + + s"Fair Scheduler configuration file: $fileName, using " + + s"the default schedulingMode: $defaultValue for pool: $poolName" try { if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { SchedulingMode.withName(xmlSchedulingMode) @@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def getIntValue( poolNode: Node, poolName: String, - propertyName: String, defaultValue: Int): Int = { + propertyName: String, + defaultValue: Int, + fileName: String): Int = { val data = (poolNode \ propertyName).text.trim try { data.toInt } catch { case e: NumberFormatException => - logWarning(s"Error while loading scheduler allocation file. " + + logWarning(s"Error while loading fair scheduler configuration from $fileName: " + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + s"$defaultValue for pool: $poolName") defaultValue @@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } }