diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 498fcc520ac5e5374ae41c7f848de0e3cf206392..e2df1b8954124bec328225bbae982c0f921861b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
 
 import scala.collection.JavaConversions._
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
  */
-class SparkHadoopUtil {
+class SparkHadoopUtil extends Logging {
   val conf: Configuration = newConfiguration()
   UserGroupInformation.setConfiguration(conf)
 
-  def runAsUser(user: String)(func: () => Unit) {
+  /**
+   * Runs the given function with a Hadoop UserGroupInformation as a thread local variable
+   * (distributed to child threads), used for authenticating HDFS and YARN calls.
+   *
+   * IMPORTANT NOTE: If this function is going to be called repeated in the same process
+   * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
+   * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
+   */
+  def runAsSparkUser(func: () => Unit) {
+    val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
     if (user != SparkContext.SPARK_UNKNOWN_USER) {
+      logDebug("running as user: " + user)
       val ugi = UserGroupInformation.createRemoteUser(user)
       transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
       ugi.doAs(new PrivilegedExceptionAction[Unit] {
         def run: Unit = func()
       })
     } else {
+      logDebug("running as SPARK_UNKNOWN_USER")
       func()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9ac7365f47f9fe55a1dbb58287b5ce6877ba65a7..e912ae8a5d3c56b8dd1bf87f88c7232869c14a2b 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,8 +22,9 @@ import java.nio.ByteBuffer
 import akka.actor._
 import akka.remote._
 
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
@@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend(
 
 private[spark] object CoarseGrainedExecutorBackend {
   def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
-          workerUrl: Option[String]) {
-    // Debug code
-    Utils.checkHost(hostname)
-
-    val conf = new SparkConf
-    // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
-    // before getting started with all our system properties, etc
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-      indestructible = true, conf = conf, new SecurityManager(conf))
-    // set it
-    val sparkHostPort = hostname + ":" + boundPort
-    actorSystem.actorOf(
-      Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
-        sparkHostPort, cores),
-      name = "Executor")
-    workerUrl.foreach{ url =>
-      actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+    workerUrl: Option[String]) {
+
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+        // Debug code
+        Utils.checkHost(hostname)
+
+        val conf = new SparkConf
+        // Create a new ActorSystem to run the backend, because we can't create a
+        // SparkEnv / Executor before getting started with all our system properties, etc
+        val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+          indestructible = true, conf = conf, new SecurityManager(conf))
+        // set it
+        val sparkHostPort = hostname + ":" + boundPort
+        actorSystem.actorOf(
+          Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
+            sparkHostPort, cores),
+          name = "Executor")
+        workerUrl.foreach {
+          url =>
+            actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+        }
+        actorSystem.awaitTermination()
+
     }
-    actorSystem.awaitTermination()
   }
 
   def main(args: Array[String]) {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 272bcda5f8f2f4d6c2b883bf7d30555c46380a27..98e7e0be813be72a99d76bd2aabc676bd80ef252 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -128,8 +128,6 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
-  val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
-
   def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
     val tr = new TaskRunner(context, taskId, serializedTask)
     runningTasks.put(taskId, tr)
@@ -172,7 +170,7 @@ private[spark] class Executor(
       }
     }
 
-    override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+    override def run() {
       val startTime = System.currentTimeMillis()
       SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 64e24506e80386cdcfd90d11533855bdded7469f..9b56f711e0e0b72991895538ffb4c1dd58580a96 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
 import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
 import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
 
-import org.apache.spark.Logging
-import org.apache.spark.TaskState
+import org.apache.spark.{Logging, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkHadoopUtil
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor
@@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend
  */
 private[spark] object MesosExecutorBackend {
   def main(args: Array[String]) {
-    MesosNativeLibrary.load()
-    // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend()
-    new MesosExecutorDriver(runner).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+        MesosNativeLibrary.load()
+        // Create a new Executor and start it running
+        val runner = new MesosExecutorBackend()
+        new MesosExecutorDriver(runner).run()
+    }
   }
 }
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index fc13dbecb455577efd074236c43ee90af727e66a..8f0ecb855718e4d0cc504a82ed3fb2fe2fc37a39 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
   private var registered = false
 
-  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
-    SparkContext.SPARK_UNKNOWN_USER)
-
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
@@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
-      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
+      override def run() {
+
         var successed = false
         try {
           // Copy
@@ -480,6 +478,8 @@ object ApplicationMaster {
 
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ApplicationMaster(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ApplicationMaster(args).run()
+    }
   }
 }
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 65b7215afbd4ca03ad5862ce5283297aa1680b2a..a3bd91590fc2520e7689aa7ce9b80f74a7adb746 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * An application master that allocates executors on behalf of a driver that is running outside
@@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 object ExecutorLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ExecutorLauncher(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ExecutorLauncher(args).run()
+    }
   }
 }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 90e807160d4b6508df7eb741b56952066022f684..c1dfe3f53b40b5737b79e84346860303fcafe380 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
 
   private var registered = false
-  
-  private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
-    SparkContext.SPARK_UNKNOWN_USER)
 
   def run() {
     // Setup the directories so things go to YARN approved directories rather
@@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
-      override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
-        var successed = false
+      override def run() {
+
+      var successed = false
         try {
           // Copy
           var mainArgs: Array[String] = new Array[String](args.userArgs.size)
@@ -462,6 +460,8 @@ object ApplicationMaster {
 
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ApplicationMaster(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ApplicationMaster(args).run()
+    }
   }
 }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a14bb377aa133fede811332903d2b2917a4fc5a0..a4ce8766d347cf8c634c8671af0b682cb153107e 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * An application master that allocates executors on behalf of a driver that is running outside
@@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 object ExecutorLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)
-    new ExecutorLauncher(args).run()
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      new ExecutorLauncher(args).run()
+    }
   }
 }