Skip to content
Snippets Groups Projects
Commit f7de6978 authored by Charles Reiss's avatar Charles Reiss
Browse files

Use Mesos ExecutorIDs to hold SlaveIDs. Then we can safely use

the Mesos ExecutorID as a Spark ExecutorID.
parent 64ba6a8c
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor) ...@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties) executor.initialize(
slaveInfo.getId.getValue + "-" + executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
properties
)
} }
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
......
...@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Long, String] val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks // An ExecutorInfo for our tasks
var executorInfo: ExecutorInfo = null var execArgs: Array[Byte] = null
override def start() { override def start() {
synchronized { synchronized {
...@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend( ...@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
} }
}.start() }.start()
executorInfo = createExecutorInfo()
waitForRegister() waitForRegister()
} }
} }
def createExecutorInfo(): ExecutorInfo = { def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " + "Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor")) "property, the SPARK_HOME environment variable or the SparkContext constructor"))
...@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment) .setEnvironment(environment)
.build() .build()
ExecutorInfo.newBuilder() ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build()) .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command) .setCommand(command)
.setData(ByteString.copyFrom(createExecArg())) .setData(ByteString.copyFrom(createExecArg()))
.addResources(memory) .addResources(memory)
...@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend( ...@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
* containing all the spark.* system properties in the form of (String, String) pairs. * containing all the spark.* system properties in the form of (String, String) pairs.
*/ */
private def createExecArg(): Array[Byte] = { private def createExecArg(): Array[Byte] = {
val props = new HashMap[String, String] if (execArgs == null) {
val iterator = System.getProperties.entrySet.iterator val props = new HashMap[String, String]
while (iterator.hasNext) { val iterator = System.getProperties.entrySet.iterator
val entry = iterator.next while (iterator.hasNext) {
val (key, value) = (entry.getKey.toString, entry.getValue.toString) val entry = iterator.next
if (key.startsWith("spark.")) { val (key, value) = (entry.getKey.toString, entry.getValue.toString)
props(key) = value if (key.startsWith("spark.")) {
props(key) = value
}
} }
// Serialize the map as an array of (String, String) pairs
execArgs = Utils.serialize(props.toArray)
} }
// Serialize the map as an array of (String, String) pairs return execArgs
return Utils.serialize(props.toArray)
} }
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
...@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
return MesosTaskInfo.newBuilder() return MesosTaskInfo.newBuilder()
.setTaskId(taskId) .setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(executorInfo) .setExecutor(createExecutorInfo(slaveId))
.setName(task.name) .setName(task.name)
.addResources(cpuResource) .addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask)) .setData(ByteString.copyFrom(task.serializedTask))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment