Skip to content
Snippets Groups Projects
Commit 2c0f705e authored by Thomas Graves's avatar Thomas Graves
Browse files

SPARK-1528 - spark on yarn, add support for accessing remote HDFS

Add a config (spark.yarn.access.namenodes) to allow applications running on yarn to access other secure HDFS cluster.  User just specifies the namenodes of the other clusters and we get Tokens for those and ship them with the spark application.

Author: Thomas Graves <tgraves@apache.org>

Closes #1159 from tgravescs/spark-1528 and squashes the following commits:

ddbcd16 [Thomas Graves] review comments
0ac8501 [Thomas Graves] SPARK-1528 - add support for accessing remote HDFS
parent e87075df
No related branches found
No related tags found
No related merge requests found
...@@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes ...@@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
set this configuration to "hdfs:///some/path". set this configuration to "hdfs:///some/path".
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.yarn.access.namenodes</code></td>
<td>(none)</td>
<td>
A list of secure HDFS namenodes your Spark application is going to access. For example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application must have acess to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
</td>
</tr>
</table> </table>
# Launching Spark on YARN # Launching Spark on YARN
......
...@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._ ...@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.util.StringUtils import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
...@@ -191,23 +191,11 @@ trait ClientBase extends Logging { ...@@ -191,23 +191,11 @@ trait ClientBase extends Logging {
// Upload Spark and the application JAR to the remote file system if necessary. Add them as // Upload Spark and the application JAR to the remote file system if necessary. Add them as
// local resources to the application master. // local resources to the application master.
val fs = FileSystem.get(conf) val fs = FileSystem.get(conf)
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir) val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
val localResources = HashMap[String, LocalResource]() val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
...@@ -614,4 +602,40 @@ object ClientBase extends Logging { ...@@ -614,4 +602,40 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path, YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator) File.pathSeparator)
/**
* Get the list of namenodes the user may access.
*/
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
.map(new Path(_)).toSet
}
private[yarn] def getTokenRenewer(conf: Configuration): String = {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
logDebug("delegation token renewer is: " + delegTokenRenewer)
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
delegTokenRenewer
}
/**
* Obtains tokens for the namenodes passed in and adds them to the credentials.
*/
private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
creds: Credentials) {
if (UserGroupInformation.isSecurityEnabled()) {
val delegTokenRenewer = getTokenRenewer(conf)
paths.foreach {
dst =>
val dstFs = dst.getFileSystem(conf)
logDebug("getting token for namenode: " + dst)
dstFs.addDelegationTokens(delegTokenRenewer, creds)
}
}
}
} }
...@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext ...@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._ import org.mockito.Matchers._
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.Matchers import org.scalatest.Matchers
...@@ -38,7 +40,7 @@ import scala.collection.JavaConversions._ ...@@ -38,7 +40,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap => MutableHashMap } import scala.collection.mutable.{ HashMap => MutableHashMap }
import scala.util.Try import scala.util.Try
import org.apache.spark.SparkConf import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class ClientBaseSuite extends FunSuite with Matchers { class ClientBaseSuite extends FunSuite with Matchers {
...@@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers { ...@@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers {
} }
} }
test("check access nns empty") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set())
}
test("check access nns unset") {
val sparkConf = new SparkConf()
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set())
}
test("check access nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}
test("check access nns space") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}
test("check access two nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
}
test("check token renewer") {
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
val renewer = ClientBase.getTokenRenewer(hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}
test("check token renewer default") {
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
ClientBase.getTokenRenewer(hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}
object Fixtures { object Fixtures {
val knownDefYarnAppCP: Seq[String] = val knownDefYarnAppCP: Seq[String] =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment