Skip to content
Snippets Groups Projects
Commit 14bf2fe0 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #749 from benh/spark-executor-uri

Added property 'spark.executor.uri' for launching on Mesos.
parents 4ba4c3fe 529ac811
No related branches found
No related tags found
No related merge requests found
......@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val runScript = new File(sparkHome, "run").getCanonicalPath
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
......@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "run").getCanonicalPath
command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
return command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
......
......@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
......@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
val command = CommandInfo.newBuilder()
.setValue(execScript)
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
......
......@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/conf" "$DISTDIR"
cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then
......
......@@ -178,7 +178,7 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"org.apache.mesos" % "mesos" % "0.12.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",
......
......@@ -831,6 +831,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
var sparkContext: SparkContext = null
def createSparkContext(): SparkContext = {
val uri = System.getenv("SPARK_EXECUTOR_URI")
if (uri != null) {
System.setProperty("spark.executor.uri", uri)
}
val master = this.master match {
case Some(m) => m
case None => {
......
......@@ -72,7 +72,10 @@ esac
# hard to kill the child with stuff like Process.destroy(). However, for
# the Spark shell, the wrapper is necessary to properly reset the terminal
# when we exit, so we allow it to set a variable to launch with scala.
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
# We still fall back on java for the shell if this is a "release" created
# from make-distribution.sh since it's possible scala is not installed
# but we have everything we need to run the shell.
if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then
if [ "$SCALA_HOME" ]; then
RUNNER="${SCALA_HOME}/bin/scala"
else
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment