Skip to content
Snippets Groups Projects
Commit 6f0d2c1c authored by Justin Ma's avatar Justin Ma
Browse files

round robin scheduling of tasks has been added

parent e9ffe6ca
No related branches found
No related tags found
No related merge requests found
...@@ -25,7 +25,7 @@ extends RDD[String](sc) { ...@@ -25,7 +25,7 @@ extends RDD[String](sc) {
ConfigureLock.synchronized { inputFormat.configure(conf) } ConfigureLock.synchronized { inputFormat.configure(conf) }
@transient val splits_ = @transient val splits_ =
inputFormat.getSplits(conf, 2).map(new HdfsSplit(_)).toArray inputFormat.getSplits(conf, sc.scheduler.numCores).map(new HdfsSplit(_)).toArray
override def splits = splits_.asInstanceOf[Array[Split]] override def splits = splits_.asInstanceOf[Array[Split]]
......
...@@ -3,6 +3,7 @@ package spark ...@@ -3,6 +3,7 @@ package spark
import java.io.File import java.io.File
import scala.collection.mutable.Map import scala.collection.mutable.Map
import scala.collection.JavaConversions._
import mesos.{Scheduler => NScheduler} import mesos.{Scheduler => NScheduler}
import mesos._ import mesos._
...@@ -105,10 +106,20 @@ extends NScheduler with spark.Scheduler ...@@ -105,10 +106,20 @@ extends NScheduler with spark.Scheduler
val tasks = new java.util.ArrayList[TaskDescription] val tasks = new java.util.ArrayList[TaskDescription]
if (activeOp != null) { if (activeOp != null) {
try { try {
for (i <- 0 until offers.size.toInt) { val availableCpus = offers.map(_.getParams.get("cpus").toInt)
activeOp.slaveOffer(offers.get(i)) match { val availableMem = offers.map(_.getParams.get("mem").toInt)
case Some(task) => tasks.add(task) var resourcesAvailable = true
case None => {} while (resourcesAvailable) {
resourcesAvailable = false
for (i <- 0 until offers.size.toInt) {
activeOp.slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
case Some(task) =>
tasks.add(task)
availableCpus(i) -= task.getParams.get("cpus").toInt
availableMem(i) -= task.getParams.get("mem").toInt
resourcesAvailable = resourcesAvailable || true
case None => {}
}
} }
} }
} catch { } catch {
...@@ -162,7 +173,7 @@ extends NScheduler with spark.Scheduler ...@@ -162,7 +173,7 @@ extends NScheduler with spark.Scheduler
// Trait representing an object that manages a parallel operation by // Trait representing an object that manages a parallel operation by
// implementing various scheduler callbacks. // implementing various scheduler callbacks.
trait ParallelOperation { trait ParallelOperation {
def slaveOffer(s: SlaveOffer): Option[TaskDescription] def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit def statusUpdate(t: TaskStatus): Unit
def error(code: Int, message: String): Unit def error(code: Int, message: String): Unit
} }
...@@ -207,7 +218,7 @@ extends ParallelOperation ...@@ -207,7 +218,7 @@ extends ParallelOperation
} }
} }
def slaveOffer(offer: SlaveOffer): Option[TaskDescription] = { def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription] = {
if (tasksLaunched < numTasks) { if (tasksLaunched < numTasks) {
var checkPrefVals: Array[Boolean] = Array(true) var checkPrefVals: Array[Boolean] = Array(true)
val time = System.currentTimeMillis val time = System.currentTimeMillis
...@@ -215,9 +226,8 @@ extends ParallelOperation ...@@ -215,9 +226,8 @@ extends ParallelOperation
checkPrefVals = Array(true, false) // Allow non-preferred tasks checkPrefVals = Array(true, false) // Allow non-preferred tasks
// TODO: Make desiredCpus and desiredMem configurable // TODO: Make desiredCpus and desiredMem configurable
val desiredCpus = 1 val desiredCpus = 1
val desiredMem = 750 val desiredMem = 500
if (offer.getParams.get("cpus").toInt < desiredCpus || if ((availableCpus < desiredCpus) || (availableMem < desiredMem))
offer.getParams.get("mem").toInt < desiredMem)
return None return None
for (checkPref <- checkPrefVals; i <- 0 until numTasks) { for (checkPref <- checkPrefVals; i <- 0 until numTasks) {
if (!launched(i) && (!checkPref || if (!launched(i) && (!checkPref ||
...@@ -264,7 +274,7 @@ extends ParallelOperation ...@@ -264,7 +274,7 @@ extends ParallelOperation
def taskFinished(status: TaskStatus) { def taskFinished(status: TaskStatus) {
val tid = status.getTaskId val tid = status.getTaskId
println("Finished TID " + tid) print("Finished TID " + tid)
if (!finished(tidToIndex(tid))) { if (!finished(tidToIndex(tid))) {
// Deserialize task result // Deserialize task result
val result = Utils.deserialize[TaskResult[T]](status.getData) val result = Utils.deserialize[TaskResult[T]](status.getData)
...@@ -274,10 +284,12 @@ extends ParallelOperation ...@@ -274,10 +284,12 @@ extends ParallelOperation
// Mark finished and stop if we've finished all the tasks // Mark finished and stop if we've finished all the tasks
finished(tidToIndex(tid)) = true finished(tidToIndex(tid)) = true
tasksFinished += 1 tasksFinished += 1
println(", finished " + tasksFinished + "/" + numTasks)
if (tasksFinished == numTasks) if (tasksFinished == numTasks)
setAllFinished() setAllFinished()
} else { } else {
printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid))
} }
} }
......
...@@ -25,7 +25,7 @@ class SparkContext(master: String, frameworkName: String) { ...@@ -25,7 +25,7 @@ class SparkContext(master: String, frameworkName: String) {
val LOCAL_REGEX = """local\[([0-9]+)\]""".r val LOCAL_REGEX = """local\[([0-9]+)\]""".r
private var scheduler: Scheduler = master match { private[spark] var scheduler: Scheduler = master match {
case "local" => new LocalScheduler(1) case "local" => new LocalScheduler(1)
case LOCAL_REGEX(threads) => new LocalScheduler(threads.toInt) case LOCAL_REGEX(threads) => new LocalScheduler(threads.toInt)
case _ => { System.loadLibrary("mesos"); case _ => { System.loadLibrary("mesos");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment