From 13a19505e4d77c93a279d0273826974b4f4cb0a4 Mon Sep 17 00:00:00 2001
From: tgravescs <tgraves_cs@yahoo.com>
Date: Fri, 8 Nov 2013 12:04:09 -0600
Subject: [PATCH] Don't call the doAs if user is unknown or the same user that
 is already running

---
 .../apache/spark/deploy/SparkHadoopUtil.scala | 21 ++++++++++++++-----
 .../org/apache/spark/executor/Executor.scala  |  5 +----
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index c29a30184a..fc1537f796 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
@@ -34,10 +34,21 @@ class SparkHadoopUtil {
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {
-    val ugi = UserGroupInformation.createRemoteUser(user)
-    ugi.doAs(new PrivilegedExceptionAction[Unit] {
-      def run: Unit = func()
-    })
+    // if we are already running as the user intended there is no reason to do the doAs. It 
+    // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
+    // the user is UNKNOWN then we shouldn't be creating a remote unknown user 
+    // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only 
+    // in SparkContext.
+    val currentUser = Option(System.getProperty("user.name")).
+      getOrElse(SparkContext.SPARK_UNKNOWN_USER)
+    if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+      val ugi = UserGroupInformation.createRemoteUser(user)
+      ugi.doAs(new PrivilegedExceptionAction[Unit] {
+        def run: Unit = func()
+      })
+    } else {
+      func()
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0a4f10c940..5c9bb9db1c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -179,10 +179,7 @@ private[spark] class Executor(
       }
     }
 
-    // the runAsUser breaks secure HDFS access. It needs to add the credentials
-    // for the user if running as a user. Comment out for now. 
-    //override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
-    override def run(): Unit = {
+    override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
       val startTime = System.currentTimeMillis()
       SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)
-- 
GitLab