Skip to content
Snippets Groups Projects
Commit 13a19505 authored by tgravescs's avatar tgravescs
Browse files

Don't call the doAs if user is unknown or the same user that is already running

parent f95cb04e
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration ...@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkException import org.apache.spark.{SparkContext, SparkException}
/** /**
* Contains util methods to interact with Hadoop from Spark. * Contains util methods to interact with Hadoop from Spark.
...@@ -34,10 +34,21 @@ class SparkHadoopUtil { ...@@ -34,10 +34,21 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf) UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) { def runAsUser(user: String)(func: () => Unit) {
val ugi = UserGroupInformation.createRemoteUser(user) // if we are already running as the user intended there is no reason to do the doAs. It
ugi.doAs(new PrivilegedExceptionAction[Unit] { // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
def run: Unit = func() // the user is UNKNOWN then we shouldn't be creating a remote unknown user
}) // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
val ugi = UserGroupInformation.createRemoteUser(user)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
func()
}
} }
/** /**
......
...@@ -179,10 +179,7 @@ private[spark] class Executor( ...@@ -179,10 +179,7 @@ private[spark] class Executor(
} }
} }
// the runAsUser breaks secure HDFS access. It needs to add the credentials override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
// for the user if running as a user. Comment out for now.
//override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run(): Unit = {
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
SparkEnv.set(env) SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader) Thread.currentThread.setContextClassLoader(replClassLoader)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment