Skip to content
Snippets Groups Projects
Commit f6f46455 authored by Benjamin Hindman's avatar Benjamin Hindman
Browse files

Added property 'spark.executor.uri' for launching on Mesos without

requiring Spark to be installed. Using 'make_distribution.sh' a user
can put a Spark distribution at a URI supported by Mesos (e.g.,
'hdfs://...') and then set that when launching their job. Also added
SPARK_EXECUTOR_URI for the REPL.
parent 49be084e
No related branches found
No related tags found
No related merge requests found
...@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
} }
def createCommand(offer: Offer, numCores: Int): CommandInfo = { def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val runScript = new File(sparkHome, "run").getCanonicalPath
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder() val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) => sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder() environment.addVariables(Environment.Variable.newBuilder()
...@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value) .setValue(value)
.build()) .build())
} }
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "run").getCanonicalPath
command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = new File(uri).getName().split('.')(0)
command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
return command.build()
} }
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
......
...@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend( ...@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " + "Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor")) "property, the SPARK_HOME environment variable or the SparkContext constructor"))
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder() val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) => sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder() environment.addVariables(Environment.Variable.newBuilder()
...@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend( ...@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value) .setValue(value)
.build()) .build())
} }
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = new File(uri).getName().split('.')(0)
command.setValue("cd %s*; ./spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder() val memory = Resource.newBuilder()
.setName("mem") .setName("mem")
.setType(Value.Type.SCALAR) .setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build() .build()
val command = CommandInfo.newBuilder()
.setValue(execScript)
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder() ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command) .setCommand(command)
......
...@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" ...@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/conf" "$DISTDIR" cp -r "$FWDIR/conf" "$DISTDIR"
cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then if [ "$1" == "tgz" ]; then
......
...@@ -176,7 +176,7 @@ object SparkBuild extends Build { ...@@ -176,7 +176,7 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.4", "it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0", "colt" % "colt" % "1.2.0",
"net.liftweb" % "lift-json_2.9.2" % "2.5", "net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.9.0-incubating", "org.apache.mesos" % "mesos" % "0.12.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2", "io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-core" % "3.0.0",
......
...@@ -831,6 +831,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: ...@@ -831,6 +831,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
var sparkContext: SparkContext = null var sparkContext: SparkContext = null
def createSparkContext(): SparkContext = { def createSparkContext(): SparkContext = {
val uri = System.getenv("SPARK_EXECUTOR_URI")
if (uri != null) System.setProperty("spark.executor.uri", uri)
val master = this.master match { val master = this.master match {
case Some(m) => m case Some(m) => m
case None => { case None => {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment