diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 8b41620d9823280d193887275114670d74428172..48177a638a07e9672b0ebacfe5a358d6fcfd8e94 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,19 +1,16 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ActorRef, Props, Actor, Terminated} import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ -import akka.remote.RemoteClientLifeCycleEvent +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.text.SimpleDateFormat import java.util.Date -import akka.remote.RemoteClientShutdown -import akka.remote.RemoteClientDisconnected import spark.deploy.RegisterWorker import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed -import akka.actor.Terminated import java.io.File private[spark] class Worker( diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index e45288ff5300aa1b7edf1983576c53b972caf8d4..224c126fdd1eec87d34d98c33682c79ace097438 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -4,16 +4,15 @@ import java.nio.ByteBuffer import spark.Logging import spark.TaskState.TaskState import spark.util.AkkaUtils -import akka.actor.{ActorRef, Actor, Props} +import akka.actor.{ActorRef, Actor, Props, Terminated} +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import akka.remote.RemoteClientLifeCycleEvent import spark.scheduler.cluster._ import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.LaunchTask import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor - private[spark] class StandaloneExecutorBackend( executor: Executor, driverUrl: String, @@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend( var driver: ActorRef = null override def preStart() { - try { - logInfo("Connecting to driver: " + driverUrl) - driver = context.actorFor(driverUrl) - driver ! RegisterExecutor(executorId, hostname, cores) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to driver", e) - System.exit(1) - } + logInfo("Connecting to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! RegisterExecutor(executorId, hostname, cores) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(driver) // Doesn't work with remote actors, but useful for testing } override def receive = { @@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend( case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logError("Driver terminated or disconnected! Shutting down.") + System.exit(1) } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {