Skip to content
Snippets Groups Projects
Commit 044a088c authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #831 from rxin/scheduler

A few small scheduler / job description changes.
parents 839f2d4f 290e3e6e
No related branches found
No related tags found
No related merge requests found
......@@ -267,16 +267,20 @@ class SparkContext(
localProperties.value = new Properties()
}
def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
def setLocalProperty(key: String, value: String) {
if (localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
if (value == null) {
localProperties.value.remove(key)
} else {
localProperties.value.setProperty(key, value)
}
}
/** Set a human readable description of the current job. */
def setDescription(value: String) {
addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init
......
......@@ -17,19 +17,14 @@
package spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.util.control.Breaks._
import scala.xml._
import scala.xml.XML
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.util.Properties
/**
* An interface to build Schedulable tree
......@@ -56,7 +51,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
......@@ -69,39 +64,44 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, schedulingMode, minShare, weight))
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}
}
......@@ -110,7 +110,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
......@@ -127,7 +127,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
......
......@@ -48,9 +48,9 @@ private[spark] object UIWorkloadGenerator {
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
......
......@@ -57,23 +57,23 @@ object TaskThreadInfo {
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
* 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
* it will get cpu core resource, and will wait to finished after user manually
* release "Lock" and then cluster will contain another free cpu cores.
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
* it will get cpu core resource, and will wait to finished after user manually
* release "Lock" and then cluster will contain another free cpu cores.
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
......@@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
}.start()
}
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
......@@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
// 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
// queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
// queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
......@@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
......@@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
......@@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
......@@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
assert(TaskThreadInfo.threadToRunning(32) === true)
//1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
//1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
......@@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
......@@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
sem.acquire(11)
sem.acquire(11)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment