Skip to content
Snippets Groups Projects
Commit 03eefbb2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #451 from stephenh/fixdeathpactexception

Handle Terminated to avoid endless DeathPactExceptions.
parents a4611d66 870b2aaf
No related branches found
No related tags found
No related merge requests found
package spark.deploy.worker package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.{ActorRef, Props, Actor, ActorSystem} import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import spark.{Logging, Utils} import spark.{Logging, Utils}
import spark.util.AkkaUtils import spark.util.AkkaUtils
import spark.deploy._ import spark.deploy._
import akka.remote.RemoteClientLifeCycleEvent import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import akka.remote.RemoteClientShutdown
import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master import spark.deploy.master.Master
import akka.actor.Terminated
import java.io.File import java.io.File
private[spark] class Worker( private[spark] class Worker(
......
...@@ -4,16 +4,15 @@ import java.nio.ByteBuffer ...@@ -4,16 +4,15 @@ import java.nio.ByteBuffer
import spark.Logging import spark.Logging
import spark.TaskState.TaskState import spark.TaskState.TaskState
import spark.util.AkkaUtils import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props} import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._ import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend( private[spark] class StandaloneExecutorBackend(
executor: Executor, executor: Executor,
driverUrl: String, driverUrl: String,
...@@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend( ...@@ -27,17 +26,11 @@ private[spark] class StandaloneExecutorBackend(
var driver: ActorRef = null var driver: ActorRef = null
override def preStart() { override def preStart() {
try { logInfo("Connecting to driver: " + driverUrl)
logInfo("Connecting to driver: " + driverUrl) driver = context.actorFor(driverUrl)
driver = context.actorFor(driverUrl) driver ! RegisterExecutor(executorId, hostname, cores)
driver ! RegisterExecutor(executorId, hostname, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(driver) // Doesn't work with remote actors, but useful for testing
context.watch(driver) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to driver", e)
System.exit(1)
}
} }
override def receive = { override def receive = {
...@@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend( ...@@ -52,6 +45,10 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) => case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId) logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
} }
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment