Skip to content
Snippets Groups Projects
Commit 718e8c20 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Change url format to spark://host1:port1,host2:port2

This replaces the format of spark://host1:port1,spark//host2:port2 and is more
consistent with ZooKeeper's zk:// urls.
parent e1190229
No related branches found
No related tags found
No related merge requests found
...@@ -153,7 +153,7 @@ class SparkContext( ...@@ -153,7 +153,7 @@ class SparkContext(
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters // Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r val SPARK_REGEX = """spark://(.*)""".r
//Regular expression for connection to Mesos cluster //Regular expression for connection to Mesos cluster
val MESOS_REGEX = """(mesos://.*)""".r val MESOS_REGEX = """(mesos://.*)""".r
...@@ -169,7 +169,7 @@ class SparkContext( ...@@ -169,7 +169,7 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) => case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this) val scheduler = new ClusterScheduler(this)
val masterUrls = sparkUrl.split(",") val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend) scheduler.initialize(backend)
scheduler scheduler
......
...@@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState ...@@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState
/** /**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* Execute using
* ./spark-class org.apache.spark.deploy.FaultToleranceTest
* *
* In order to mimic a real distributed cluster more closely, Docker is used. * In order to mimic a real distributed cluster more closely, Docker is used.
* Unfortunately, this dependency means that the suite cannot be run automatically without a * Unfortunately, this dependency means that the suite cannot be run automatically without a
...@@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging { ...@@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
assertTrue(sparkHome != null, "Run with a valid SPARK_HOME") assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
val containerSparkHome = "/opt/spark" val containerSparkHome = "/opt/spark"
val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome) val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
...@@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging { ...@@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging {
} }
def addMasters(num: Int) { def addMasters(num: Int) {
(1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) } (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
} }
def addWorkers(num: Int) { def addWorkers(num: Int) {
val masterUrls = getMasterUrls(masters) val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls) } (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
} }
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */ /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
...@@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging { ...@@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
} }
def getMasterUrls(masters: Seq[TestMasterInfo]): String = { def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
masters.map(master => "spark://" + master.ip + ":7077").mkString(",") "spark://" + masters.map(master => master.ip + ":7077").mkString(",")
} }
def getLeader: TestMasterInfo = { def getLeader: TestMasterInfo = {
......
...@@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master ...@@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master
/** /**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur. * and a listener for cluster events, and calls back the listener when various events occur.
*
* @param masterUrls Each url should look like spark://host:port.
*/ */
private[spark] class Client( private[spark] class Client(
actorSystem: ActorSystem, actorSystem: ActorSystem,
......
...@@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI ...@@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.util.{Utils, AkkaUtils}
/**
* @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class Worker( private[spark] class Worker(
host: String, host: String,
port: Int, port: Int,
......
...@@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) { ...@@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
if (masters != null) { // Two positional arguments were given if (masters != null) { // Two positional arguments were given
printUsageAndExit(1) printUsageAndExit(1)
} }
masters = value.split(",") masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
parse(tail) parse(tail)
case Nil => case Nil =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment