Skip to content
Snippets Groups Projects
Commit cdaa0fad authored by Josh Rosen's avatar Josh Rosen
Browse files

Use external addresses in standalone WebUI on EC2.

parent 8d3713c2
No related branches found
No related tags found
No related merge requests found
...@@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd` ...@@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh" . "$bin/spark-config.sh"
"$bin"/spark-daemon.sh start spark.deploy.master.Master # Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
\ No newline at end of file if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.master.Master
#!/usr/bin/env bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
...@@ -18,6 +18,7 @@ if [ "$SPARK_MASTER_IP" = "" ]; then ...@@ -18,6 +18,7 @@ if [ "$SPARK_MASTER_IP" = "" ]; then
SPARK_MASTER_IP=`hostname` SPARK_MASTER_IP=`hostname`
fi fi
echo "Master IP: $ip" echo "Master IP: $SPARK_MASTER_IP"
"$bin"/spark-daemons.sh start spark.deploy.worker.Worker spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT # Launch the slaves
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
...@@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable ...@@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master // Worker to Master
private[spark] private[spark]
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) case class RegisterWorker(
id: String,
host: String,
port: Int,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
extends DeployMessage extends DeployMessage
private[spark] private[spark]
......
...@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo] val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo] val completedJobs = new ArrayBuffer[JobInfo]
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
}
// As a temporary workaround before better ways of configuring memory, we allow users to set // As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job // a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes. // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
...@@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} }
override def receive = { override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => { case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format( logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory))) host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) { if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID") sender ! RegisterWorkerFailed("Duplicate worker ID")
} else { } else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort) addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker("http://" + ip + ":" + webUiPort) sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
schedule() schedule()
} }
} }
...@@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
} }
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = { def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort) publicAddress: String): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker workers += worker
idToWorker(worker.id) = worker idToWorker(worker.id) = worker
actorToWorker(sender) = worker actorToWorker(sender) = worker
......
...@@ -10,7 +10,8 @@ private[spark] class WorkerInfo( ...@@ -10,7 +10,8 @@ private[spark] class WorkerInfo(
val cores: Int, val cores: Int,
val memory: Int, val memory: Int,
val actor: ActorRef, val actor: ActorRef,
val webUiPort: Int) { val webUiPort: Int,
val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
...@@ -37,8 +38,8 @@ private[spark] class WorkerInfo( ...@@ -37,8 +38,8 @@ private[spark] class WorkerInfo(
def hasExecutor(job: JobInfo): Boolean = { def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job) executors.values.exists(_.job == job)
} }
def webUiAddress : String = { def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort "http://" + this.publicAddress + ":" + this.webUiPort
} }
} }
...@@ -36,6 +36,10 @@ private[spark] class Worker( ...@@ -36,6 +36,10 @@ private[spark] class Worker(
var workDir: File = null var workDir: File = null
val executors = new HashMap[String, ExecutorRunner] val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner]
val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
}
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
...@@ -79,7 +83,7 @@ private[spark] class Worker( ...@@ -79,7 +83,7 @@ private[spark] class Worker(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try { try {
master = context.actorFor(akkaUrl) master = context.actorFor(akkaUrl)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort) master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch { } catch {
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<tr> <tr>
<td> <td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href> <a href="@worker.webUiAddress">@worker.id</href>
</td> </td>
<td>@{worker.host}:@{worker.port}</td> <td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td> <td>@worker.cores (@worker.coresUsed Used)</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment