diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index e73b780fcbc856e6a1dea731e87bb971f7159149..e83368b98ddf0cff22aa3eb6d853c2fb36be2dda 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -118,20 +118,28 @@ private[spark] class MesosSchedulerBackend(
     return execArgs
   }
 
-  private def setClassLoader() {
-    // Since native code starts the thread our callbacks run in, it may not correctly
-    // inherit and custom class loaders. Therefore, set the class loader manually.
+  private def setClassLoader(): ClassLoader = {
+    val oldClassLoader = Thread.currentThread.getContextClassLoader
     Thread.currentThread.setContextClassLoader(classLoader)
+    return oldClassLoader
+  }
+
+  private def restoreClassLoader(oldClassLoader: ClassLoader) {
+    Thread.currentThread.setContextClassLoader(oldClassLoader)
   }
 
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    setClassLoader()
-    logInfo("Registered as framework ID " + frameworkId.getValue)
-    registeredLock.synchronized {
-      isRegistered = true
-      registeredLock.notifyAll()
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Registered as framework ID " + frameworkId.getValue)
+      registeredLock.synchronized {
+        isRegistered = true
+        registeredLock.notifyAll()
+      }
+    } finally {
+      restoreClassLoader(oldClassLoader)
     }
   }
 
@@ -153,50 +161,54 @@ private[spark] class MesosSchedulerBackend(
    * tasks are balanced across the cluster.
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    setClassLoader()
-    synchronized {
-      // Build a big list of the offerable workers, and remember their indices so that we can
-      // figure out which Offer to reply to for each worker
-      val offerableIndices = new ArrayBuffer[Int]
-      val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
-      def enoughMemory(o: Offer) = {
-        val mem = getResource(o.getResourcesList, "mem")
-        val slaveId = o.getSlaveId.getValue
-        mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
-      }
+    val oldClassLoader = setClassLoader()
+    try {
+      synchronized {
+        // Build a big list of the offerable workers, and remember their indices so that we can
+        // figure out which Offer to reply to for each worker
+        val offerableIndices = new ArrayBuffer[Int]
+        val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+        def enoughMemory(o: Offer) = {
+          val mem = getResource(o.getResourcesList, "mem")
+          val slaveId = o.getSlaveId.getValue
+          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+        }
 
-      for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-        offerableIndices += index
-        offerableWorkers += new WorkerOffer(
-          offer.getSlaveId.getValue,
-          offer.getHostname,
-          getResource(offer.getResourcesList, "cpus").toInt)
-      }
+        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+          offerableIndices += index
+          offerableWorkers += new WorkerOffer(
+            offer.getSlaveId.getValue,
+            offer.getHostname,
+            getResource(offer.getResourcesList, "cpus").toInt)
+        }
 
-      // Call into the ClusterScheduler
-      val taskLists = scheduler.resourceOffers(offerableWorkers)
-
-      // Build a list of Mesos tasks for each slave
-      val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
-      for ((taskList, index) <- taskLists.zipWithIndex) {
-        if (!taskList.isEmpty) {
-          val offerNum = offerableIndices(index)
-          val slaveId = offers(offerNum).getSlaveId.getValue
-          slaveIdsWithExecutors += slaveId
-          mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
-          for (taskDesc <- taskList) {
-            taskIdToSlaveId(taskDesc.taskId) = slaveId
-            mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+        // Call into the ClusterScheduler
+        val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+        // Build a list of Mesos tasks for each slave
+        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+        for ((taskList, index) <- taskLists.zipWithIndex) {
+          if (!taskList.isEmpty) {
+            val offerNum = offerableIndices(index)
+            val slaveId = offers(offerNum).getSlaveId.getValue
+            slaveIdsWithExecutors += slaveId
+            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+            for (taskDesc <- taskList) {
+              taskIdToSlaveId(taskDesc.taskId) = slaveId
+              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+            }
           }
         }
-      }
 
-      // Reply to the offers
-      val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-      for (i <- 0 until offers.size) {
-        d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+        // Reply to the offers
+        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+        for (i <- 0 until offers.size) {
+          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+        }
       }
+    } finally {
+      restoreClassLoader(oldClassLoader)
     }
   }
 
@@ -236,24 +248,33 @@ private[spark] class MesosSchedulerBackend(
   }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    setClassLoader()
-    val tid = status.getTaskId.getValue.toLong
-    val state = TaskState.fromMesos(status.getState)
-    synchronized {
-      if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
-        // We lost the executor on this slave, so remember that it's gone
-        slaveIdsWithExecutors -= taskIdToSlaveId(tid)
-      }
-      if (isFinished(status.getState)) {
-        taskIdToSlaveId.remove(tid)
+    val oldClassLoader = setClassLoader()
+    try {
+      val tid = status.getTaskId.getValue.toLong
+      val state = TaskState.fromMesos(status.getState)
+      synchronized {
+        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+          // We lost the executor on this slave, so remember that it's gone
+          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+        }
+        if (isFinished(status.getState)) {
+          taskIdToSlaveId.remove(tid)
+        }
       }
+      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+    } finally {
+      restoreClassLoader(oldClassLoader)
     }
-    scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
   }
 
   override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
-    scheduler.error(message)
+    val oldClassLoader = setClassLoader()
+    try {
+      logError("Mesos error: " + message)
+      scheduler.error(message)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
   }
 
   override def stop() {
@@ -269,11 +290,16 @@ private[spark] class MesosSchedulerBackend(
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 
   private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
-    synchronized {
-      slaveIdsWithExecutors -= slaveId.getValue
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Mesos slave lost: " + slaveId.getValue)
+      synchronized {
+        slaveIdsWithExecutors -= slaveId.getValue
+      }
+      scheduler.executorLost(slaveId.getValue, reason)
+    } finally {
+      restoreClassLoader(oldClassLoader)
     }
-    scheduler.executorLost(slaveId.getValue, reason)
   }
 
   override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {