diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f2687ce6b42b4dab9c227163f3f1c27b926c96b3..7c1c831c248fc7a7e19320ef2ae74b446ede4035 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -160,6 +160,8 @@ object Client { val (actorSystem, _) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) + // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely + Master.toAkkaUrl(driverArgs.master) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 4efebcaa350fe42604ba3ea4f38ce7c37b434967..39a7b0319b6a120f91944edabb28bce4b790355e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -47,6 +47,8 @@ private[spark] class AppClient( conf: SparkConf) extends Logging { + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) + val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -75,9 +77,9 @@ private[spark] class AppClient( } def tryRegisterAllMasters() { - for (masterUrl <- masterUrls) { - logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) + for (masterAkkaUrl <- masterAkkaUrls) { + logInfo("Connecting to master " + masterAkkaUrl + "...") + val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterApplication(appDescription) } } @@ -103,20 +105,14 @@ private[spark] class AppClient( } def changeMaster(url: String) { + // activeMasterUrl is a valid Spark url since we receive it from master. activeMasterUrl = url master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = activeMasterUrl match { - case Master.sparkUrlRegex(host, port) => - Address("akka.tcp", Master.systemName, host, port.toInt) - case x => - throw new SparkException("Invalid spark URL: " + x) - } + masterAddress = Master.toAkkaAddress(activeMasterUrl) } private def isPossibleMaster(remoteUrl: Address) = { - masterUrls.map(s => Master.toAkkaUrl(s)) - .map(u => AddressFromURIString(u).hostPort) - .contains(remoteUrl.hostPort) + masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort) } override def receiveWithLogging = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e8a5cfc746fedcd1ceea457681cb547bfe79b48e..b1c015246b610c1d12086bd4797864b8385bec31 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -845,7 +845,6 @@ private[spark] class Master( private[spark] object Master extends Logging { val systemName = "sparkMaster" private val actorName = "Master" - val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { SignalLogger.register(log) @@ -855,14 +854,24 @@ private[spark] object Master extends Logging { actorSystem.awaitTermination() } - /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ + /** + * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`. + * + * @throws SparkException if the url is invalid + */ def toAkkaUrl(sparkUrl: String): String = { - sparkUrl match { - case sparkUrlRegex(host, port) => - "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) - case _ => - throw new SparkException("Invalid master URL: " + sparkUrl) - } + val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) + "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + } + + /** + * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`. + * + * @throws SparkException if the url is invalid + */ + def toAkkaAddress(sparkUrl: String): Address = { + val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) + Address("akka.tcp", systemName, host, port) } def startSystemAndActor( diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f0f3da5eec4dfc094a17a3a5117fffa542cd944a..13599830123d02a884028940c056ce099be8bacf 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} /** - * @param masterUrls Each url should look like spark://host:port. + * @param masterAkkaUrls Each url should be a valid akka url. */ private[spark] class Worker( host: String, @@ -48,7 +48,7 @@ private[spark] class Worker( webUiPort: Int, cores: Int, memory: Int, - masterUrls: Array[String], + masterAkkaUrls: Array[String], actorSystemName: String, actorName: String, workDirPath: String = null, @@ -171,15 +171,11 @@ private[spark] class Worker( } def changeMaster(url: String, uiUrl: String) { + // activeMasterUrl it's a valid Spark url since we receive it from master. activeMasterUrl = url activeMasterWebUiUrl = uiUrl master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = activeMasterUrl match { - case Master.sparkUrlRegex(_host, _port) => - Address("akka.tcp", Master.systemName, _host, _port.toInt) - case x => - throw new SparkException("Invalid spark URL: " + x) - } + masterAddress = Master.toAkkaAddress(activeMasterUrl) connected = true // Cancel any outstanding re-registration attempts because we found a new master registrationRetryTimer.foreach(_.cancel()) @@ -187,9 +183,9 @@ private[spark] class Worker( } private def tryRegisterAllMasters() { - for (masterUrl <- masterUrls) { - logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) + for (masterAkkaUrl <- masterAkkaUrls) { + logInfo("Connecting to master " + masterAkkaUrl + "...") + val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } } @@ -527,8 +523,9 @@ private[spark] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) + masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d771baaa6abc7eba81887489754cb8926a94103..9d6b6161ce4daf76547624bb6256cc2a07445a91 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1842,6 +1842,35 @@ private[spark] object Utils extends Logging { sparkValue } } + + /** + * Return a pair of host and port extracted from the `sparkUrl`. + * + * A spark url (`spark://host:port`) is a special URI that its scheme is `spark` and only contains + * host and port. + * + * @throws SparkException if `sparkUrl` is invalid. + */ + def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = { + try { + val uri = new java.net.URI(sparkUrl) + val host = uri.getHost + val port = uri.getPort + if (uri.getScheme != "spark" || + host == null || + port < 0 || + (uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null + uri.getFragment != null || + uri.getQuery != null || + uri.getUserInfo != null) { + throw new SparkException("Invalid master URL: " + sparkUrl) + } + (host, port) + } catch { + case e: java.net.URISyntaxException => + throw new SparkException("Invalid master URL: " + sparkUrl, e) + } + } } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..3d2335f9b36378a46fd92493cef74b454abb7646 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import akka.actor.Address +import org.scalatest.FunSuite + +import org.apache.spark.SparkException + +class MasterSuite extends FunSuite { + + test("toAkkaUrl") { + val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234") + assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl) + } + + test("toAkkaUrl: a typo url") { + val e = intercept[SparkException] { + Master.toAkkaUrl("spark://1.2. 3.4:1234") + } + assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage) + } + + test("toAkkaAddress") { + val address = Master.toAkkaAddress("spark://1.2.3.4:1234") + assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address) + } + + test("toAkkaAddress: a typo url") { + val e = intercept[SparkException] { + Master.toAkkaAddress("spark://1.2. 3.4:1234") + } + assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage) + } +}