diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala index f705a5631a5c3fafd1def44c345e8cd4e1190ee5..a76a8e9730c313fb889af735b1370981d6aace0f 100644 --- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala @@ -59,12 +59,12 @@ private[spark] class StandaloneExecutorBackend( driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - // context.watch(driver) // Doesn't work with remote actors, but useful for testing } override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") + context.watch(sender) //Start watching for terminated messages. // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) @@ -81,7 +81,7 @@ private[spark] class StandaloneExecutorBackend( executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } - case DisassociatedEvent(_, _, _) => + case Terminated(actor) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index b6f0ec961a69c14d19c208e124274eaabd7e7986..2d09b329f2a820e64907a431f769e7e587751f68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -102,11 +102,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor case Terminated(actor) => actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) - case DisassociatedEvent(_, remoteAddress, _) => - addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected")) - - case AssociationErrorEvent(_, _, remoteAddress, _) => - addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown")) } // Make fake resource offers on all executors diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 8daf50ab6902b6bb9f86d94347cfe53039fe55f6..2a831382df92cd3f8096c2d0ccb037e2d27fbf7e 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -43,29 +43,35 @@ private[spark] object AkkaUtils { val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" - // 10 seconds is the default akka timeout, but in a cluster, we need higher by default. - val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt - val akkaConf = ConfigFactory.parseString(""" - akka.daemonic = on - akka.loggers = [""akka.event.slf4j.Slf4jLogger""] - akka.stdout-loglevel = "ERROR" - akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" - akka.remote.netty.tcp.hostname = "%s" - akka.remote.netty.tcp.port = %d - akka.remote.netty.tcp.connection-timeout = %d s - akka.remote.netty.tcp.maximum-frame-size = %dMiB - akka.remote.netty.tcp.execution-pool-size = %d - akka.actor.default-dispatcher.throughput = %d - akka.remote.log-remote-lifecycle-events = %s - """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize, - lifecycleEvents)) + val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt + val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt + val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt + + val akkaConf = ConfigFactory.parseString( + s""" + |akka.daemonic = on + |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] + |akka.stdout-loglevel = "ERROR" + |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s + |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s + |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector + |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s + |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector + |akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s + |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB + |akka.remote.netty.tcp.execution-pool-size = $akkaThreads + |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.remote.log-remote-lifecycle-events = $lifecycleEvents + """.stripMargin) val actorSystem = ActorSystem(name, akkaConf) - // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a - // hack because Akka doesn't let you figure out the port through the public API yet. val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get (actorSystem, boundPort)