Skip to content
Snippets Groups Projects
Commit 489862a6 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Remote death watch has a funny bug.

parent 77929cfe
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,6 @@ import scala.concurrent.duration._ ...@@ -23,7 +23,6 @@ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import akka.actor._ import akka.actor._
import akka.actor.Terminated
import akka.pattern.AskTimeoutException import akka.pattern.AskTimeoutException
import akka.pattern.ask import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
...@@ -62,6 +61,7 @@ private[spark] class Client( ...@@ -62,6 +61,7 @@ private[spark] class Client(
var alreadyDead = false // To avoid calling listener.dead() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() { override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try { try {
registerWithMaster() registerWithMaster()
} catch { } catch {
...@@ -107,7 +107,6 @@ private[spark] class Client( ...@@ -107,7 +107,6 @@ private[spark] class Client(
override def receive = { override def receive = {
case RegisteredApplication(appId_, masterUrl) => case RegisteredApplication(appId_, masterUrl) =>
context.watch(sender)
prevMaster = sender prevMaster = sender
appId = appId_ appId = appId_
registered = true registered = true
...@@ -123,7 +122,7 @@ private[spark] class Client( ...@@ -123,7 +122,7 @@ private[spark] class Client(
val fullId = appId + "/" + id val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory) listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) => case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = appId + "/" + id val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("") val messageText = message.map(s => " (" + s + ")").getOrElse("")
...@@ -134,13 +133,12 @@ private[spark] class Client( ...@@ -134,13 +133,12 @@ private[spark] class Client(
case MasterChanged(masterUrl, masterWebUiUrl) => case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl) logInfo("Master has changed, new master is at " + masterUrl)
context.unwatch(prevMaster)
changeMaster(masterUrl) changeMaster(masterUrl)
alreadyDisconnected = false alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId) sender ! MasterChangeAcknowledged(appId)
case Terminated(actor_) => case DisassociatedEvent(_, address, _) =>
logWarning(s"Connection to $actor_ failed; waiting for master to reconnect...") logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected() markDisconnected()
case StopClient => case StopClient =>
......
...@@ -147,9 +147,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -147,9 +147,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
RecoveryState.ALIVE RecoveryState.ALIVE
else else
RecoveryState.RECOVERING RecoveryState.RECOVERING
logInfo("I have been elected leader! New state: " + state) logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) { if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers) beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
...@@ -171,7 +169,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -171,7 +169,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else { } else {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
registerWorker(worker) registerWorker(worker)
context.watch(sender)
persistenceEngine.addWorker(worker) persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl) sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule() schedule()
...@@ -186,7 +183,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -186,7 +183,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val app = createApplication(description, sender) val app = createApplication(description, sender)
registerApplication(app) registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id) logInfo("Registered app " + description.name + " with ID " + app.id)
context.watch(sender)
persistenceEngine.addApplication(app) persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl) sender ! RegisteredApplication(app.id, masterUrl)
schedule() schedule()
...@@ -262,15 +258,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -262,15 +258,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() } if (canCompleteRecovery) { completeRecovery() }
} }
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
logInfo(s"$actor got terminated, removing it.")
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case DisassociatedEvent(_, address, _) => { case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was // The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.") logInfo(s"$address got disassociated, removing it.")
...@@ -438,8 +425,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -438,8 +425,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec) exec.application.removeExecutor(exec)
} }
context.stop(worker.actor)
context.unwatch(worker.actor)
persistenceEngine.removeWorker(worker) persistenceEngine.removeWorker(worker)
} }
...@@ -502,8 +487,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -502,8 +487,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
app.driver ! ApplicationRemoved(state.toString) app.driver ! ApplicationRemoved(state.toString)
} }
persistenceEngine.removeApplication(app) persistenceEngine.removeApplication(app)
context.stop(app.driver)
context.unwatch(app.driver)
schedule() schedule()
} }
} }
......
...@@ -73,7 +73,6 @@ private[spark] class Worker( ...@@ -73,7 +73,6 @@ private[spark] class Worker(
val masterLock: Object = new Object() val masterLock: Object = new Object()
var master: ActorSelection = null var master: ActorSelection = null
var prevMaster: ActorRef = null
var activeMasterUrl: String = "" var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = "" var activeMasterWebUiUrl : String = ""
@volatile var registered = false @volatile var registered = false
...@@ -173,8 +172,6 @@ private[spark] class Worker( ...@@ -173,8 +172,6 @@ private[spark] class Worker(
case RegisteredWorker(masterUrl, masterWebUiUrl) => case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl) logInfo("Successfully registered with master " + masterUrl)
registered = true registered = true
context.watch(sender) // remote death watch for master
prevMaster = sender
changeMaster(masterUrl, masterWebUiUrl) changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
...@@ -185,8 +182,6 @@ private[spark] class Worker( ...@@ -185,8 +182,6 @@ private[spark] class Worker(
case MasterChanged(masterUrl, masterWebUiUrl) => case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl) logInfo("Master has changed, new master is at " + masterUrl)
context.unwatch(prevMaster)
prevMaster = sender
changeMaster(masterUrl, masterWebUiUrl) changeMaster(masterUrl, masterWebUiUrl)
val execs = executors.values. val execs = executors.values.
...@@ -245,10 +240,6 @@ private[spark] class Worker( ...@@ -245,10 +240,6 @@ private[spark] class Worker(
} }
} }
case Terminated(actor_) =>
logInfo(s"$actor_ terminated !")
masterDisconnected()
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logInfo(s"$x Disassociated !") logInfo(s"$x Disassociated !")
masterDisconnected() masterDisconnected()
......
...@@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive = { override def receive = {
case RegisteredExecutor(sparkProperties) => case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver") logInfo("Successfully registered with driver")
context.watch(sender) //Start watching for terminated messages.
// Make this host instead of hostPort ? // Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
...@@ -76,10 +75,6 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -76,10 +75,6 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.killTask(taskId) executor.killTask(taskId)
} }
case Terminated(actor) =>
logError(s"Driver $actor terminated, Shutting down.")
System.exit(1)
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logError(s"Driver $x disassociated! Shutting down.") logError(s"Driver $x disassociated! Shutting down.")
System.exit(1) System.exit(1)
......
...@@ -73,7 +73,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac ...@@ -73,7 +73,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
} else { } else {
logInfo("Registered executor: " + sender + " with ID " + executorId) logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties) sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
executorActor(executorId) = sender executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1 executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores freeCores(executorId) = cores
...@@ -118,9 +117,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac ...@@ -118,9 +117,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
removeExecutor(executorId, reason) removeExecutor(executorId, reason)
sender ! true sender ! true
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case DisassociatedEvent(_, address, _) => case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
......
...@@ -46,20 +46,15 @@ private[spark] object AkkaUtils { ...@@ -46,20 +46,15 @@ private[spark] object AkkaUtils {
val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
// Since we have our own Heart Beat mechanism and TCP already tracks connections. val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt
// Using this makes very little sense. So setting this to a relatively larger value suffices.
val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
val akkaConf = ConfigFactory.parseString( val akkaConf = ConfigFactory.parseString(
s""" s"""
|akka.daemonic = on |akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
|akka.stdout-loglevel = "ERROR" |akka.stdout-loglevel = "ERROR"
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
|akka.remote.transport-failure-detector.heartbeat-interval = 30 s
|akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s
|akka.remote.transport-failure-detector.threshold = $akkaFailureDetector |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
......
...@@ -136,3 +136,4 @@ fi ...@@ -136,3 +136,4 @@ fi
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment