diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 408692ec9c495eb6135c562608ca1e660505e8da..f60e56d95963465bb0b6223e92afbac04b1447f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -27,7 +27,7 @@ import akka.pattern.AskTimeoutException import akka.pattern.ask import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -49,7 +49,7 @@ private[spark] class Client( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 - var prevMaster: ActorRef = null // set for unwatching, when it fails. + var masterAddress: Address = null var actor: ActorRef = null var appId: String = null var registered = false @@ -103,11 +103,14 @@ private[spark] class Client( def changeMaster(url: String) { activeMasterUrl = url master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName, host, port.toInt) + case x => throw new SparkException("Invalid spark URL:"+x) + } } override def receive = { case RegisteredApplication(appId_, masterUrl) => - prevMaster = sender appId = appId_ registered = true changeMaster(masterUrl) @@ -137,7 +140,7 @@ private[spark] class Client( alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - case DisassociatedEvent(_, address, _) => + case DisassociatedEvent(_, address, _) if address == masterAddress => logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 81fb5c4e43255fa4fb976d76f624a078cb01193f..0e2b461b13e843d6013e39a4bc0ed0d568dc950e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -517,9 +517,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } private[spark] object Master { - private val systemName = "sparkMaster" + val systemName = "sparkMaster" private val actorName = "Master" - private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings)