Skip to content
Snippets Groups Projects
Commit a099a63a authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Initial work to make Spark compile with Mesos 0.9 and Hadoop 1.0

parent a5e2b6a6
No related branches found
No related tags found
No related merge requests found
File added
File deleted
......@@ -24,9 +24,15 @@ class Executor extends org.apache.mesos.Executor with Logging {
initLogging()
override def init(d: ExecutorDriver, args: ExecutorArgs) {
override def registered(
driver: ExecutorDriver,
executorInfo: ExecutorInfo,
frameworkId: FrameworkID,
frameworkInfo: FrameworkInfo,
slaveId: SlaveID,
slaveInfo: SlaveInfo) {
// Read spark.* system properties from executor arg
val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray)
val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
for ((key, value) <- props) {
System.setProperty(key, value)
}
......@@ -172,7 +178,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
*/
object Executor extends Logging {
def main(args: Array[String]) {
System.loadLibrary("mesos")
MesosNativeLibrary.load()
// Create a new Executor and start it running
val exec = new Executor
new MesosExecutorDriver(exec).run()
......
......@@ -124,25 +124,29 @@ private class MesosScheduler(
"property, the SPARK_HOME environment variable or the SparkContext constructor")
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val params = Params.newBuilder()
val environment = Environment.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
params.addParam(Param.newBuilder()
.setKey("env." + key)
.setValue(System.getenv(key))
.build())
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(System.getenv(key))
.build())
}
}
environment.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Resource.Type.SCALAR)
.setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
.build()
val command = CommandInfo.newBuilder()
.setValue(execScript)
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
.setUri(execScript)
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.setParams(params.build())
.addResources(memory)
.build()
}
......
......@@ -167,8 +167,8 @@ class SimpleJob(
// Create and return the Mesos task object
val cpuRes = Resource.newBuilder()
.setName("cpus")
.setType(Resource.Type.SCALAR)
.setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(CPUS_PER_TASK).build())
.build()
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
......
......@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.mesos.MesosNativeLibrary
import spark.broadcast._
class SparkContext(
......@@ -72,7 +74,7 @@ class SparkContext(
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>
System.loadLibrary("mesos")
MesosNativeLibrary.load()
new MesosScheduler(this, master, frameworkName)
}
}
......
......@@ -4,6 +4,9 @@ import sbtassembly.Plugin._
import AssemblyKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
val HADOOP_VERSION = "0.20.205.0"
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
......@@ -41,7 +44,8 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/"
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1",
......@@ -49,7 +53,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
"org.apache.hadoop" % "hadoop-core" % "0.20.2",
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.3.0",
"de.javakaffee" % "kryo-serializers" % "0.9",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment