diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index c29a30184af131cf46ce7d6ebf245107c755b0b7..fc1537f7963c44b4adb8e2e72c8786b92e6aaf70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.SparkException +import org.apache.spark.{SparkContext, SparkException} /** * Contains util methods to interact with Hadoop from Spark. @@ -34,10 +34,21 @@ class SparkHadoopUtil { UserGroupInformation.setConfiguration(conf) def runAsUser(user: String)(func: () => Unit) { - val ugi = UserGroupInformation.createRemoteUser(user) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + // if we are already running as the user intended there is no reason to do the doAs. It + // will actually break secure HDFS access as it doesn't fill in the credentials. Also if + // the user is UNKNOWN then we shouldn't be creating a remote unknown user + // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only + // in SparkContext. + val currentUser = Option(System.getProperty("user.name")). + getOrElse(SparkContext.SPARK_UNKNOWN_USER) + if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) { + val ugi = UserGroupInformation.createRemoteUser(user) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } else { + func() + } } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0a4f10c94060bdcfa8c68634d446ac7521262aa3..5c9bb9db1ce9e9269f394f045a67980826db3c69 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -179,10 +179,7 @@ private[spark] class Executor( } } - // the runAsUser breaks secure HDFS access. It needs to add the credentials - // for the user if running as a user. Comment out for now. - //override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => - override def run(): Unit = { + override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader)