Skip to content
Snippets Groups Projects
Commit a8bfdd43 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Enabled remote death watch and a way to configure the timeouts for akka heartbeats.

parent c77ca1fe
No related branches found
No related tags found
No related merge requests found
...@@ -59,12 +59,12 @@ private[spark] class StandaloneExecutorBackend( ...@@ -59,12 +59,12 @@ private[spark] class StandaloneExecutorBackend(
driver = context.actorSelection(driverUrl) driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores) driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// context.watch(driver) // Doesn't work with remote actors, but useful for testing
} }
override def receive = { override def receive = {
case RegisteredExecutor(sparkProperties) => case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver") logInfo("Successfully registered with driver")
context.watch(sender) //Start watching for terminated messages.
// Make this host instead of hostPort ? // Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
...@@ -81,7 +81,7 @@ private[spark] class StandaloneExecutorBackend( ...@@ -81,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
} }
case DisassociatedEvent(_, _, _) => case Terminated(actor) =>
logError("Driver terminated or disconnected! Shutting down.") logError("Driver terminated or disconnected! Shutting down.")
System.exit(1) System.exit(1)
} }
......
...@@ -102,11 +102,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor ...@@ -102,11 +102,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case Terminated(actor) => case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case DisassociatedEvent(_, remoteAddress, _) =>
addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
case AssociationErrorEvent(_, _, remoteAddress, _) =>
addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
} }
// Make fake resource offers on all executors // Make fake resource offers on all executors
......
...@@ -43,29 +43,35 @@ private[spark] object AkkaUtils { ...@@ -43,29 +43,35 @@ private[spark] object AkkaUtils {
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
// 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
val akkaConf = ConfigFactory.parseString(""" val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
akka.daemonic = on val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
akka.loggers = [""akka.event.slf4j.Slf4jLogger""] val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
akka.stdout-loglevel = "ERROR"
akka.actor.provider = "akka.remote.RemoteActorRefProvider" val akkaConf = ConfigFactory.parseString(
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" s"""
akka.remote.netty.tcp.hostname = "%s" |akka.daemonic = on
akka.remote.netty.tcp.port = %d |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
akka.remote.netty.tcp.connection-timeout = %d s |akka.stdout-loglevel = "ERROR"
akka.remote.netty.tcp.maximum-frame-size = %dMiB |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
akka.remote.netty.tcp.execution-pool-size = %d |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
akka.actor.default-dispatcher.throughput = %d |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
akka.remote.log-remote-lifecycle-events = %s |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
""".format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize, |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
lifecycleEvents)) |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
val actorSystem = ActorSystem(name, akkaConf) val actorSystem = ActorSystem(name, akkaConf)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get val boundPort = provider.getDefaultAddress.port.get
(actorSystem, boundPort) (actorSystem, boundPort)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment