Skip to content
Snippets Groups Projects
Commit 8c1d1c98 authored by Charles Reiss's avatar Charles Reiss
Browse files

Explicitly set class loader for MesosSchedulerDriver callbacks.

parent 744da8ee
No related branches found
No related tags found
No related merge requests found
...@@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend( ...@@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend(
// An ExecutorInfo for our tasks // An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null var execArgs: Array[Byte] = null
var classLoader: ClassLoader = null
override def start() { override def start() {
synchronized { synchronized {
classLoader = Thread.currentThread.getContextClassLoader
new Thread("MesosSchedulerBackend driver") { new Thread("MesosSchedulerBackend driver") {
setDaemon(true) setDaemon(true)
override def run() { override def run() {
...@@ -114,9 +118,16 @@ private[spark] class MesosSchedulerBackend( ...@@ -114,9 +118,16 @@ private[spark] class MesosSchedulerBackend(
return execArgs return execArgs
} }
private def setClassLoader() {
// Since native code starts the thread our callbacks run in, it may not correctly
// inherit and custom class loaders. Therefore, set the class loader manually.
Thread.currentThread.setContextClassLoader(classLoader)
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
setClassLoader()
logInfo("Registered as framework ID " + frameworkId.getValue) logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized { registeredLock.synchronized {
isRegistered = true isRegistered = true
...@@ -142,6 +153,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -142,6 +153,7 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster. * tasks are balanced across the cluster.
*/ */
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
setClassLoader()
synchronized { synchronized {
// Build a big list of the offerable workers, and remember their indices so that we can // Build a big list of the offerable workers, and remember their indices so that we can
// figure out which Offer to reply to for each worker // figure out which Offer to reply to for each worker
...@@ -224,6 +236,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -224,6 +236,7 @@ private[spark] class MesosSchedulerBackend(
} }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
setClassLoader()
val tid = status.getTaskId.getValue.toLong val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState) val state = TaskState.fromMesos(status.getState)
synchronized { synchronized {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment