Skip to content
Snippets Groups Projects
Commit 366c09c4 authored by Justin Ma's avatar Justin Ma
Browse files

Let's use future instead of actors

parent 0896fd62
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package spark ...@@ -3,6 +3,7 @@ package spark
import java.io.File import java.io.File
import scala.collection.mutable.Map import scala.collection.mutable.Map
import scala.collection.mutable.Queue
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
...@@ -32,6 +33,7 @@ extends NScheduler with spark.Scheduler ...@@ -32,6 +33,7 @@ extends NScheduler with spark.Scheduler
val registeredLock = new Object() val registeredLock = new Object()
// Current callback object (may be null) // Current callback object (may be null)
var activeOpsQueue = new Queue[Int]
var activeOps = new HashMap[Int, ParallelOperation] var activeOps = new HashMap[Int, ParallelOperation]
private var nextOpId = 0 private var nextOpId = 0
private[spark] var taskIdToOpId = new HashMap[Int, Int] private[spark] var taskIdToOpId = new HashMap[Int, Int]
...@@ -72,8 +74,8 @@ extends NScheduler with spark.Scheduler ...@@ -72,8 +74,8 @@ extends NScheduler with spark.Scheduler
override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
var opId = 0 var opId = 0
runTasksMutex.synchronized { waitForRegister()
waitForRegister() this.synchronized {
opId = newOpId() opId = newOpId()
} }
val myOp = new SimpleParallelOperation(this, tasks, opId) val myOp = new SimpleParallelOperation(this, tasks, opId)
...@@ -81,12 +83,14 @@ extends NScheduler with spark.Scheduler ...@@ -81,12 +83,14 @@ extends NScheduler with spark.Scheduler
try { try {
this.synchronized { this.synchronized {
this.activeOps(myOp.opId) = myOp this.activeOps(myOp.opId) = myOp
this.activeOpsQueue += myOp.opId
} }
driver.reviveOffers(); driver.reviveOffers();
myOp.join(); myOp.join();
} finally { } finally {
this.synchronized { this.synchronized {
this.activeOps.remove(myOp.opId) this.activeOps.remove(myOp.opId)
this.activeOpsQueue.dequeueAll(x => (x == myOp.opId))
} }
} }
...@@ -117,21 +121,24 @@ extends NScheduler with spark.Scheduler ...@@ -117,21 +121,24 @@ extends NScheduler with spark.Scheduler
val tasks = new java.util.ArrayList[TaskDescription] val tasks = new java.util.ArrayList[TaskDescription]
val availableCpus = offers.map(_.getParams.get("cpus").toInt) val availableCpus = offers.map(_.getParams.get("cpus").toInt)
val availableMem = offers.map(_.getParams.get("mem").toInt) val availableMem = offers.map(_.getParams.get("mem").toInt)
var resourcesAvailable = true var launchedTask = true
while (resourcesAvailable) { for (opId <- activeOpsQueue) {
resourcesAvailable = false launchedTask = true
for (i <- 0 until offers.size.toInt; (opId, activeOp) <- activeOps) { while (launchedTask) {
try { launchedTask = false
activeOp.slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match { for (i <- 0 until offers.size.toInt) {
case Some(task) => try {
tasks.add(task) activeOps(opId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
availableCpus(i) -= task.getParams.get("cpus").toInt case Some(task) =>
availableMem(i) -= task.getParams.get("mem").toInt tasks.add(task)
resourcesAvailable = resourcesAvailable || true availableCpus(i) -= task.getParams.get("cpus").toInt
case None => {} availableMem(i) -= task.getParams.get("mem").toInt
launchedTask = launchedTask || true
case None => {}
}
} catch {
case e: Exception => e.printStackTrace
} }
} catch {
case e: Exception => e.printStackTrace
} }
} }
} }
...@@ -317,6 +324,7 @@ extends ParallelOperation ...@@ -317,6 +324,7 @@ extends ParallelOperation
println("Lost opId " + opId + " TID " + tid) println("Lost opId " + opId + " TID " + tid)
if (!finished(tidToIndex(tid))) { if (!finished(tidToIndex(tid))) {
launched(tidToIndex(tid)) = false launched(tidToIndex(tid)) = false
sched.taskIdToOpId.remove(tid)
tasksLaunched -= 1 tasksLaunched -= 1
} else { } else {
printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
......
...@@ -6,16 +6,6 @@ import java.util.UUID ...@@ -6,16 +6,6 @@ import java.util.UUID
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.actors.Actor._ import scala.actors.Actor._
case class SparkAsyncLock(var finished: Boolean = false) {
def join() {
this.synchronized {
while (!finished) {
this.wait
}
}
}
}
class SparkContext(master: String, frameworkName: String) { class SparkContext(master: String, frameworkName: String) {
Broadcast.initialize(true) Broadcast.initialize(true)
...@@ -32,18 +22,6 @@ class SparkContext(master: String, frameworkName: String) { ...@@ -32,18 +22,6 @@ class SparkContext(master: String, frameworkName: String) {
def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local)
//def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) //def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local)
def fork(f: => Unit): SparkAsyncLock = {
val thisLock = new SparkAsyncLock
actor {
f
thisLock.synchronized {
thisLock.finished = true
thisLock.notifyAll()
}
}
thisLock
}
def textFile(path: String) = new HdfsTextFile(this, path) def textFile(path: String) = new HdfsTextFile(this, path)
val LOCAL_REGEX = """local\[([0-9]+)\]""".r val LOCAL_REGEX = """local\[([0-9]+)\]""".r
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment