From 560e44a8e1d5a2cf42bf640090016f6201c6fbd7 Mon Sep 17 00:00:00 2001 From: Prashant Sharma <prashant.s@imaginea.com> Date: Tue, 26 Nov 2013 17:50:29 +0530 Subject: [PATCH] Restored master address for client. --- .../scala/org/apache/spark/deploy/client/Client.scala | 11 +++++++---- .../scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 408692ec9c..f60e56d959 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -27,7 +27,7 @@ import akka.pattern.AskTimeoutException import akka.pattern.ask import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -49,7 +49,7 @@ private[spark] class Client( val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 - var prevMaster: ActorRef = null // set for unwatching, when it fails. + var masterAddress: Address = null var actor: ActorRef = null var appId: String = null var registered = false @@ -103,11 +103,14 @@ private[spark] class Client( def changeMaster(url: String) { activeMasterUrl = url master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName, host, port.toInt) + case x => throw new SparkException("Invalid spark URL:"+x) + } } override def receive = { case RegisteredApplication(appId_, masterUrl) => - prevMaster = sender appId = appId_ registered = true changeMaster(masterUrl) @@ -137,7 +140,7 @@ private[spark] class Client( alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - case DisassociatedEvent(_, address, _) => + case DisassociatedEvent(_, address, _) if address == masterAddress => logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 81fb5c4e43..0e2b461b13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -517,9 +517,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } private[spark] object Master { - private val systemName = "sparkMaster" + val systemName = "sparkMaster" private val actorName = "Master" - private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) -- GitLab