diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index ca7fab4cc5ff95f318b33fc2b82e09f870d83f03..e73b780fcbc856e6a1dea731e87bb971f7159149 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend( // An ExecutorInfo for our tasks var execArgs: Array[Byte] = null + var classLoader: ClassLoader = null + override def start() { synchronized { + classLoader = Thread.currentThread.getContextClassLoader + new Thread("MesosSchedulerBackend driver") { setDaemon(true) override def run() { @@ -114,9 +118,16 @@ private[spark] class MesosSchedulerBackend( return execArgs } + private def setClassLoader() { + // Since native code starts the thread our callbacks run in, it may not correctly + // inherit and custom class loaders. Therefore, set the class loader manually. + Thread.currentThread.setContextClassLoader(classLoader) + } + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + setClassLoader() logInfo("Registered as framework ID " + frameworkId.getValue) registeredLock.synchronized { isRegistered = true @@ -142,6 +153,7 @@ private[spark] class MesosSchedulerBackend( * tasks are balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + setClassLoader() synchronized { // Build a big list of the offerable workers, and remember their indices so that we can // figure out which Offer to reply to for each worker @@ -224,6 +236,7 @@ private[spark] class MesosSchedulerBackend( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + setClassLoader() val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) synchronized {