Skip to content
Snippets Groups Projects
Commit 8af99f23 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #149 from tgravescs/fixSecureHdfsAccess

Fix secure hdfs access for spark on yarn

https://github.com/apache/incubator-spark/pull/23 broke secure hdfs access. Not sure if it works with secure hdfs on standalone. Fixing it at least for spark on yarn.

The broadcasting of jobconf change also broke secure hdfs access as it didn't take into account things calling the getPartitions before sparkContext is initialized. The DAGScheduler does this as it tries to getShuffleMapStage.
parents 72a601ec 13a19505
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkException
import org.apache.spark.{SparkContext, SparkException}
/**
* Contains util methods to interact with Hadoop from Spark.
......@@ -34,10 +34,21 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
val ugi = UserGroupInformation.createRemoteUser(user)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
// if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
val ugi = UserGroupInformation.createRemoteUser(user)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
func()
}
}
/**
......
......@@ -132,6 +132,8 @@ class HadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment