Skip to content
Snippets Groups Projects
Commit dadff5f0 authored by Eren Avsarogullari's avatar Eren Avsarogullari Committed by Kay Ousterhout
Browse files

[SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Logging

Fair Scheduler Logging for the following cases can be useful for the user.

1. If **valid** `spark.scheduler.allocation.file` property is set, user can be informed and aware which scheduler file is processed when `SparkContext` initializes.

2. If **invalid** `spark.scheduler.allocation.file` property is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler. Also other potential issues can be covered at this level as **Fair Scheduler can not be built. + exception stacktrace**
```
Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at java.io.FileInputStream.<init>(FileInputStream.java:93)
	at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76)
	at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75)
```
3. If `spark.scheduler.allocation.file` property is not set and **default** fair scheduler file (**fairscheduler.xml**) is found in classpath, it will be loaded but currently, user is not informed for using default file so logging can be useful as **Fair Scheduler file: fairscheduler.xml is found successfully and will be parsed.**

4. If **spark.scheduler.allocation.file** property is not set and **default** fair scheduler file does not exist in classpath, currently, user is not informed so logging can be useful as **No Fair Scheduler file found.**

Also this PR is related with https://github.com/apache/spark/pull/15237 to emphasize fileName in warning logs when fair scheduler file has invalid minShare, weight or schedulingMode values.

## How was this patch tested?
Added new Unit Tests.

Author: erenavsarogullari <erenavsarogullari@gmail.com>

Closes #16813 from erenavsarogullari/SPARK-19466.
parent c5a66356
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
import scala.util.control.NonFatal
import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
......@@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
......@@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val DEFAULT_WEIGHT = 1
override def buildPools() {
var is: Option[InputStream] = None
var fileData: Option[(InputStream, String)] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
None
}
}
is.foreach { i => buildFairSchedulerPool(i) }
fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
} catch {
case NonFatal(t) =>
val defaultMessage = "Error while building the fair scheduler pools"
val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
.getOrElse(defaultMessage)
logError(message, t)
throw t
} finally {
is.foreach(_.close())
fileData.foreach { case (is, fileName) => is.close() }
}
// finally create "default" pool
......@@ -93,24 +111,27 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
private def buildFairSchedulerPool(is: InputStream) {
private def buildFairSchedulerPool(is: InputStream, fileName: String) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
val schedulingMode = getSchedulingModeValue(poolNode, poolName,
DEFAULT_SCHEDULING_MODE, fileName)
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
DEFAULT_MINIMUM_SHARE, fileName)
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
DEFAULT_WEIGHT, fileName)
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
......@@ -118,11 +139,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
defaultValue: SchedulingMode): SchedulingMode = {
defaultValue: SchedulingMode,
fileName: String): SchedulingMode = {
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
s"schedulingMode: $defaultValue for pool: $poolName"
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
s"Fair Scheduler configuration file: $fileName, using " +
s"the default schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
......@@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getIntValue(
poolNode: Node,
poolName: String,
propertyName: String, defaultValue: Int): Int = {
propertyName: String,
defaultValue: Int,
fileName: String): Int = {
val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
logWarning(s"Error while loading scheduler allocation file. " +
logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
......@@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment