From c6be4ffbf957d1512559efcbf10d11397efca154 Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Fri, 29 Jun 2012 16:18:51 -0700 Subject: [PATCH] Fixes to CoarseMesosScheduler --- .../mesos/CoarseMesosScheduler.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala index 525cf9747f..95ad6c5b59 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala @@ -64,7 +64,7 @@ class CoarseMesosScheduler( def receive = { case RegisterSlave(slaveId, host) => slaveActor(slaveId) = sender - logInfo("Slave actor: " + sender) + logInfo("Registered slave: " + sender + " with ID " + slaveId) slaveHost(slaveId) = host freeCores(slaveId) = coresPerSlave makeFakeOffers() @@ -96,7 +96,7 @@ class CoarseMesosScheduler( } } - val masterActor: ActorRef = actorSystem.actorOf(Props[MasterActor], name = actorName) + val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName) val taskIdsOnSlave = new HashMap[String, HashSet[String]] @@ -284,24 +284,28 @@ class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) { generation = 0 def run(id: Int): Unit = { - val actorSystem = SparkEnv.get.actorSystem - val actor = actorSystem.actorOf(Props(new WorkerActor(slaveId, host)), name = "WorkerActor") + val env = SparkEnv.get + val classLoader = currentThread.getContextClassLoader + val actor = env.actorSystem.actorOf( + Props(new WorkerActor(slaveId, host, env, classLoader)), + name = "WorkerActor") + // Wait forever so that our Mesos task doesn't end while (true) { Thread.sleep(10000) } } } -class WorkerActor(slaveId: String, host: String) extends Actor with Logging { - val env = SparkEnv.get - val classLoader = currentThread.getContextClassLoader +class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader) + extends Actor with Logging { + val threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) val masterIp: String = System.getProperty("spark.master.host", "localhost") val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt val masterActor = env.actorSystem.actorFor( - "akka://spark@%s:%s/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) + "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) class TaskRunner(desc: MTaskInfo) extends Runnable { -- GitLab