Skip to content
Snippets Groups Projects
Commit 560e44a8 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Restored master address for client.

parent d092a8cc
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,7 @@ import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
......@@ -49,7 +49,7 @@ private[spark] class Client(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
var prevMaster: ActorRef = null // set for unwatching, when it fails.
var masterAddress: Address = null
var actor: ActorRef = null
var appId: String = null
var registered = false
......@@ -103,11 +103,14 @@ private[spark] class Client(
def changeMaster(url: String) {
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName, host, port.toInt)
case x => throw new SparkException("Invalid spark URL:"+x)
}
}
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
prevMaster = sender
appId = appId_
registered = true
changeMaster(masterUrl)
......@@ -137,7 +140,7 @@ private[spark] class Client(
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
case DisassociatedEvent(_, address, _) =>
case DisassociatedEvent(_, address, _) if address == masterAddress =>
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
......
......@@ -517,9 +517,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
private[spark] object Master {
private val systemName = "sparkMaster"
val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment