diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 663dec2615673c7fb0f0a628d162abc24fa9947d..b1d1d30283a08b8401da77e283982f7f081962b2 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -16,11 +16,8 @@ import java.nio.ByteBuffer
 /**
  * The Mesos executor for Spark.
  */
-private[spark] class Executor extends Logging {
-  @volatile private var urlClassLoader : ExecutorURLClassLoader = null
-  @volatile private var threadPool: ExecutorService = null
-  @volatile private var env: SparkEnv = null
-
+private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
+  
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
   private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -30,52 +27,50 @@ private[spark] class Executor extends Logging {
 
   initLogging()
 
-  def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
-    // Make sure the local hostname we report matches the cluster scheduler's name for this host
-    Utils.setCustomHostname(slaveHostname)
+  // Make sure the local hostname we report matches the cluster scheduler's name for this host
+  Utils.setCustomHostname(slaveHostname)
 
-    // Set spark.* system properties from executor arg
-    for ((key, value) <- properties) {
-      System.setProperty(key, value)
-    }
+  // Set spark.* system properties from executor arg
+  for ((key, value) <- properties) {
+    System.setProperty(key, value)
+  }
+
+  // Create our ClassLoader and set it on this thread
+  private val urlClassLoader = createClassLoader()
+  Thread.currentThread.setContextClassLoader(urlClassLoader)
 
-    // Create our ClassLoader and set it on this thread
-    urlClassLoader = createClassLoader()
-    Thread.currentThread.setContextClassLoader(urlClassLoader)
-
-    // Make any thread terminations due to uncaught exceptions kill the entire
-    // executor process to avoid surprising stalls.
-    Thread.setDefaultUncaughtExceptionHandler(
-      new Thread.UncaughtExceptionHandler {
-        override def uncaughtException(thread: Thread, exception: Throwable) {
-          try {
-            logError("Uncaught exception in thread " + thread, exception)
-            
-            // We may have been called from a shutdown hook. If so, we must not call System.exit().
-            // (If we do, we will deadlock.)
-            if (!Utils.inShutdown()) {
-              if (exception.isInstanceOf[OutOfMemoryError]) {
-                System.exit(ExecutorExitCode.OOM)
-              } else {
-                System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-              }
+  // Make any thread terminations due to uncaught exceptions kill the entire
+  // executor process to avoid surprising stalls.
+  Thread.setDefaultUncaughtExceptionHandler(
+    new Thread.UncaughtExceptionHandler {
+      override def uncaughtException(thread: Thread, exception: Throwable) {
+        try {
+          logError("Uncaught exception in thread " + thread, exception)
+          
+          // We may have been called from a shutdown hook. If so, we must not call System.exit().
+          // (If we do, we will deadlock.)
+          if (!Utils.inShutdown()) {
+            if (exception.isInstanceOf[OutOfMemoryError]) {
+              System.exit(ExecutorExitCode.OOM)
+            } else {
+              System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
             }
-          } catch {
-            case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-            case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
           }
+        } catch {
+          case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+          case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
         }
       }
-    )
+    }
+  )
 
-    // Initialize Spark environment (using system properties read above)
-    env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-    SparkEnv.set(env)
+  // Initialize Spark environment (using system properties read above)
+  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+  SparkEnv.set(env)
 
-    // Start worker thread pool
-    threadPool = new ThreadPoolExecutor(
-      1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-  }
+  // Start worker thread pool
+  val threadPool = new ThreadPoolExecutor(
+    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
 
   def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 818d6d1dda7f9774c60abfbf6ee0dff7730983a2..10f3531df0d350ee15487a0fc73ca6122a4150fd 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
 import spark.TaskState
 
-private[spark] class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend
   extends MesosExecutor
   with ExecutorBackend
   with Logging {
 
+  var executor: Executor = null
   var driver: ExecutorDriver = null
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
     logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
     this.driver = driver
     val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor.initialize(
+    executor = new Executor(
       executorInfo.getExecutorId.getValue,
       slaveInfo.getHostname,
-      properties
-    )
+      properties)
   }
 
   override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
     val taskId = taskInfo.getTaskId.getValue.toLong
-    executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+    if (executor == null) {
+      logError("Received launchTask but executor was null")
+    } else {
+      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+    }
   }
 
   override def error(d: ExecutorDriver, message: String) {
@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
   def main(args: Array[String]) {
     MesosNativeLibrary.load()
     // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend(new Executor)
+    val runner = new MesosExecutorBackend()
     new MesosExecutorDriver(runner).run()
   }
 }
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 9a82c3054c0fcf3ff6d0ecf5dc36c54d658fdaa0..1047f71c6ae0dc13eaf873049fbfeceaa4d1ff69 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
 import spark.scheduler.cluster.RegisterExecutor
 
 private[spark] class StandaloneExecutorBackend(
-    executor: Executor,
     driverUrl: String,
     executorId: String,
     hostname: String,
@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
   with ExecutorBackend
   with Logging {
 
+  var executor: Executor = null
   var driver: ActorRef = null
 
   override def preStart() {
@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
   override def receive = {
     case RegisteredExecutor(sparkProperties) =>
       logInfo("Successfully registered with driver")
-      executor.initialize(executorId, hostname, sparkProperties)
+      executor = new Executor(executorId, hostname, sparkProperties)
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
 
     case LaunchTask(taskDesc) =>
       logInfo("Got assigned task " + taskDesc.taskId)
-      executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+      if (executor == null) {
+        logError("Received launchTask but executor was null")
+        System.exit(1)
+      } else {
+        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+      }
 
     case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
       logError("Driver terminated or disconnected! Shutting down.")
@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
     // before getting started with all our system properties, etc
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
     val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
+      Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
       name = "Executor")
     actorSystem.awaitTermination()
   }