diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index c29a30184af131cf46ce7d6ebf245107c755b0b7..fc1537f7963c44b4adb8e2e72c8786b92e6aaf70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.SparkException +import org.apache.spark.{SparkContext, SparkException} /** * Contains util methods to interact with Hadoop from Spark. @@ -34,10 +34,21 @@ class SparkHadoopUtil { UserGroupInformation.setConfiguration(conf) def runAsUser(user: String)(func: () => Unit) { - val ugi = UserGroupInformation.createRemoteUser(user) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + // if we are already running as the user intended there is no reason to do the doAs. It + // will actually break secure HDFS access as it doesn't fill in the credentials. Also if + // the user is UNKNOWN then we shouldn't be creating a remote unknown user + // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only + // in SparkContext. + val currentUser = Option(System.getProperty("user.name")). + getOrElse(SparkContext.SPARK_UNKNOWN_USER) + if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) { + val ugi = UserGroupInformation.createRemoteUser(user) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } else { + func() + } } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 32901a508f53b34d44c01fc631e9927680bd312a..47e958b5e6f4bfdb4380ce81d6fb206bb9d04f87 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -132,6 +132,8 @@ class HadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val jobConf = getJobConf() + // add the credentials here as this can be called before SparkContext initialized + SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf)