diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala index 525cf9747f313489ad3d8b39290476fc302dd410..95ad6c5b59969ed8176ec70e165063e8ff506f7d 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala @@ -64,7 +64,7 @@ class CoarseMesosScheduler( def receive = { case RegisterSlave(slaveId, host) => slaveActor(slaveId) = sender - logInfo("Slave actor: " + sender) + logInfo("Registered slave: " + sender + " with ID " + slaveId) slaveHost(slaveId) = host freeCores(slaveId) = coresPerSlave makeFakeOffers() @@ -96,7 +96,7 @@ class CoarseMesosScheduler( } } - val masterActor: ActorRef = actorSystem.actorOf(Props[MasterActor], name = actorName) + val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName) val taskIdsOnSlave = new HashMap[String, HashSet[String]] @@ -284,24 +284,28 @@ class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) { generation = 0 def run(id: Int): Unit = { - val actorSystem = SparkEnv.get.actorSystem - val actor = actorSystem.actorOf(Props(new WorkerActor(slaveId, host)), name = "WorkerActor") + val env = SparkEnv.get + val classLoader = currentThread.getContextClassLoader + val actor = env.actorSystem.actorOf( + Props(new WorkerActor(slaveId, host, env, classLoader)), + name = "WorkerActor") + // Wait forever so that our Mesos task doesn't end while (true) { Thread.sleep(10000) } } } -class WorkerActor(slaveId: String, host: String) extends Actor with Logging { - val env = SparkEnv.get - val classLoader = currentThread.getContextClassLoader +class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader) + extends Actor with Logging { + val threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) val masterIp: String = System.getProperty("spark.master.host", "localhost") val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt val masterActor = env.actorSystem.actorFor( - "akka://spark@%s:%s/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) + "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) class TaskRunner(desc: MTaskInfo) extends Runnable {