Skip to content
Snippets Groups Projects
Commit e47253e0 authored by Charles Reiss's avatar Charles Reiss
Browse files

Reset ClassLoader in MesosSchedulerBackend, too. (per review comments).

Also set ClassLoader for all mesos callbacks, not just statusUpdate,
registered.
parent 8c1d1c98
No related branches found
No related tags found
No related merge requests found
......@@ -118,20 +118,28 @@ private[spark] class MesosSchedulerBackend(
return execArgs
}
private def setClassLoader() {
// Since native code starts the thread our callbacks run in, it may not correctly
// inherit and custom class loaders. Therefore, set the class loader manually.
private def setClassLoader(): ClassLoader = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
return oldClassLoader
}
private def restoreClassLoader(oldClassLoader: ClassLoader) {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
setClassLoader()
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
val oldClassLoader = setClassLoader()
try {
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
}
} finally {
restoreClassLoader(oldClassLoader)
}
}
......@@ -153,50 +161,54 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
setClassLoader()
synchronized {
// Build a big list of the offerable workers, and remember their indices so that we can
// figure out which Offer to reply to for each worker
val offerableIndices = new ArrayBuffer[Int]
val offerableWorkers = new ArrayBuffer[WorkerOffer]
def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue
mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
}
val oldClassLoader = setClassLoader()
try {
synchronized {
// Build a big list of the offerable workers, and remember their indices so that we can
// figure out which Offer to reply to for each worker
val offerableIndices = new ArrayBuffer[Int]
val offerableWorkers = new ArrayBuffer[WorkerOffer]
def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue
mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
offerableIndices += index
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
offerableIndices += index
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)
}
// Call into the ClusterScheduler
val taskLists = scheduler.resourceOffers(offerableWorkers)
// Build a list of Mesos tasks for each slave
val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
for ((taskList, index) <- taskLists.zipWithIndex) {
if (!taskList.isEmpty) {
val offerNum = offerableIndices(index)
val slaveId = offers(offerNum).getSlaveId.getValue
slaveIdsWithExecutors += slaveId
mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
for (taskDesc <- taskList) {
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
// Call into the ClusterScheduler
val taskLists = scheduler.resourceOffers(offerableWorkers)
// Build a list of Mesos tasks for each slave
val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
for ((taskList, index) <- taskLists.zipWithIndex) {
if (!taskList.isEmpty) {
val offerNum = offerableIndices(index)
val slaveId = offers(offerNum).getSlaveId.getValue
slaveIdsWithExecutors += slaveId
mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
for (taskDesc <- taskList) {
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
}
}
}
}
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
for (i <- 0 until offers.size) {
d.launchTasks(offers(i).getId, mesosTasks(i), filters)
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
for (i <- 0 until offers.size) {
d.launchTasks(offers(i).getId, mesosTasks(i), filters)
}
}
} finally {
restoreClassLoader(oldClassLoader)
}
}
......@@ -236,24 +248,33 @@ private[spark] class MesosSchedulerBackend(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
setClassLoader()
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
slaveIdsWithExecutors -= taskIdToSlaveId(tid)
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
val oldClassLoader = setClassLoader()
try {
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
slaveIdsWithExecutors -= taskIdToSlaveId(tid)
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
}
}
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
} finally {
restoreClassLoader(oldClassLoader)
}
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
}
override def error(d: SchedulerDriver, message: String) {
logError("Mesos error: " + message)
scheduler.error(message)
val oldClassLoader = setClassLoader()
try {
logError("Mesos error: " + message)
scheduler.error(message)
} finally {
restoreClassLoader(oldClassLoader)
}
}
override def stop() {
......@@ -269,11 +290,16 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
val oldClassLoader = setClassLoader()
try {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
scheduler.executorLost(slaveId.getValue, reason)
} finally {
restoreClassLoader(oldClassLoader)
}
scheduler.executorLost(slaveId.getValue, reason)
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment