Skip to content
Snippets Groups Projects
Commit 4239a108 authored by jerryshao's avatar jerryshao Committed by Tom Graves
Browse files

[SPARK-19021][YARN] Generailize HDFSCredentialProvider to support non HDFS security filesystems

Currently Spark can only get token renewal interval from security HDFS (hdfs://), if Spark runs with other security file systems like webHDFS (webhdfs://), wasb (wasb://), ADLS, it will ignore these tokens and not get token renewal intervals from these tokens. These will make Spark unable to work with these security clusters. So instead of only checking HDFS token, we should generalize to support different DelegationTokenIdentifier.

## How was this patch tested?

Manually verified in security cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #16432 from jerryshao/SPARK-19021.
parent a6155135
No related branches found
No related tags found
No related merge requests found
...@@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to the services and data. ...@@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to the services and data.
Clients must first acquire tokens for the services they will access and pass them along with their Clients must first acquire tokens for the services they will access and pass them along with their
application as it is launched in the YARN cluster. application as it is launched in the YARN cluster.
For a Spark application to interact with HDFS, HBase and Hive, it must acquire the relevant tokens For a Spark application to interact with any of the Hadoop filesystem (for example hdfs, webhdfs, etc), HBase and Hive, it must acquire the relevant tokens
using the Kerberos credentials of the user launching the application using the Kerberos credentials of the user launching the application
—that is, the principal whose identity will become that of the launched Spark application. —that is, the principal whose identity will become that of the launched Spark application.
This is normally done at launch time: in a secure cluster Spark will automatically obtain a This is normally done at launch time: in a secure cluster Spark will automatically obtain a
token for the cluster's HDFS filesystem, and potentially for HBase and Hive. token for the cluster's default Hadoop filesystem, and potentially for HBase and Hive.
An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`), the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
...@@ -494,12 +494,12 @@ Similarly, a Hive token will be obtained if Hive is on the classpath, its config ...@@ -494,12 +494,12 @@ Similarly, a Hive token will be obtained if Hive is on the classpath, its config
includes a URI of the metadata store in `"hive.metastore.uris`, and includes a URI of the metadata store in `"hive.metastore.uris`, and
`spark.yarn.security.credentials.hive.enabled` is not set to `false`. `spark.yarn.security.credentials.hive.enabled` is not set to `false`.
If an application needs to interact with other secure HDFS clusters, then If an application needs to interact with other secure Hadoop filesystems, then
the tokens needed to access these clusters must be explicitly requested at the tokens needed to access these clusters must be explicitly requested at
launch time. This is done by listing them in the `spark.yarn.access.namenodes` property. launch time. This is done by listing them in the `spark.yarn.access.namenodes` property.
``` ```
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/ spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/
``` ```
Spark supports integrating with other security-aware services through Java Services mechanism (see Spark supports integrating with other security-aware services through Java Services mechanism (see
...@@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up for Oozie to request a ...@@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up for Oozie to request a
the application needs, including: the application needs, including:
- The YARN resource manager. - The YARN resource manager.
- The local HDFS filesystem. - The local Hadoop filesystem.
- Any remote HDFS filesystems used as a source or destination of I/O. - Any remote Hadoop filesystems used as a source or destination of I/O.
- Hive —if used. - Hive —if used.
- HBase —if used. - HBase —if used.
- The YARN timeline server, if the application interacts with this. - The YARN timeline server, if the application interacts with this.
......
org.apache.spark.deploy.yarn.security.HDFSCredentialProvider org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
org.apache.spark.deploy.yarn.security.HiveCredentialProvider org.apache.spark.deploy.yarn.security.HiveCredentialProvider
...@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils ...@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
/** /**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer * A ConfigurableCredentialManager to manage all the registered credential providers and offer
* APIs for other modules to obtain credentials as well as renewal time. By default * APIs for other modules to obtain credentials as well as renewal time. By default
* [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
* managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]] * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
* interface and put into resources/META-INF/services to be loaded by ServiceLoader. * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
......
...@@ -17,37 +17,40 @@ ...@@ -17,37 +17,40 @@
package org.apache.spark.deploy.yarn.security package org.apache.spark.deploy.yarn.security
import java.io.{ByteArrayInputStream, DataInputStream}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging { private[security] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call, // Token renewal interval, this value will be set in the first call,
// if None means no token renewer specified, so cannot get token renewal interval. // if None means no token renewer specified or no token can be renewed,
// so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null private var tokenRenewalInterval: Option[Long] = null
override val serviceName: String = "hdfs" override val serviceName: String = "hadoopfs"
override def obtainCredentials( override def obtainCredentials(
hadoopConf: Configuration, hadoopConf: Configuration,
sparkConf: SparkConf, sparkConf: SparkConf,
creds: Credentials): Option[Long] = { creds: Credentials): Option[Long] = {
// NameNode to access, used to get tokens from different FileSystems // NameNode to access, used to get tokens from different FileSystems
val tmpCreds = new Credentials()
val tokenRenewer = getTokenRenewer(hadoopConf)
nnsToAccess(hadoopConf, sparkConf).foreach { dst => nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf) val dstFs = dst.getFileSystem(hadoopConf)
logInfo("getting token for namenode: " + dst) logInfo("getting token for: " + dst)
dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
} }
// Get the token renewal interval if it is not set. It will only be called once. // Get the token renewal interval if it is not set. It will only be called once.
...@@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider ...@@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
} }
// Get the time of next renewal. // Get the time of next renewal.
tokenRenewalInterval.map { interval => val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
creds.getAllTokens.asScala val nextRenewalDates = tmpCreds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t => .map { t =>
val identifier = new DelegationTokenIdentifier() val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
identifier.getIssueDate + interval identifier.getIssueDate + interval
}.foldLeft(0L)(math.max) }
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
} }
creds.addAll(tmpCreds)
nextRenewalDate
} }
private def getTokenRenewalInterval( private def getTokenRenewalInterval(
...@@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider ...@@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
val dstFs = dst.getFileSystem(hadoopConf) val dstFs = dst.getFileSystem(hadoopConf)
dstFs.addDelegationTokens(renewer, creds) dstFs.addDelegationTokens(renewer, creds)
} }
val hdfsToken = creds.getAllTokens.asScala
.find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) val renewIntervals = creds.getAllTokens.asScala.filter {
hdfsToken.map { t => _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
val newExpiration = t.renew(hadoopConf) }.flatMap { token =>
val identifier = new DelegationTokenIdentifier() Try {
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) val newExpiration = token.renew(hadoopConf)
val interval = newExpiration - identifier.getIssueDate val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
logInfo(s"Renewal Interval is $interval") val interval = newExpiration - identifier.getIssueDate
interval logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
} }
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
} }
} }
......
...@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit ...@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
test("Correctly load default credential providers") { test("Correctly load default credential providers") {
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
credentialManager.getServiceCredentialProvider("hdfs") should not be (None) credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should not be (None) credentialManager.getServiceCredentialProvider("hive") should not be (None)
} }
...@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit ...@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
credentialManager.getServiceCredentialProvider("hdfs") should not be (None) credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None) credentialManager.getServiceCredentialProvider("hive") should be (None)
} }
test("using deprecated configurations") { test("using deprecated configurations") {
sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false") sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false") sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
credentialManager.getServiceCredentialProvider("hdfs") should be (None) credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None) credentialManager.getServiceCredentialProvider("hive") should be (None)
credentialManager.getServiceCredentialProvider("test") should not be (None) credentialManager.getServiceCredentialProvider("test") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None)
......
...@@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester} ...@@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
class HDFSCredentialProviderSuite class HadoopFSCredentialProviderSuite
extends SparkFunSuite extends SparkFunSuite
with PrivateMethodTester with PrivateMethodTester
with Matchers { with Matchers {
private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer) private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
private def getTokenRenewer( private def getTokenRenewer(
hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = { fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
hdfsCredentialProvider invokePrivate _getTokenRenewer(conf) fsCredentialProvider invokePrivate _getTokenRenewer(conf)
} }
private var hdfsCredentialProvider: HDFSCredentialProvider = null private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
override def beforeAll() { override def beforeAll() {
super.beforeAll() super.beforeAll()
if (hdfsCredentialProvider == null) { if (hadoopFsCredentialProvider == null) {
hdfsCredentialProvider = new HDFSCredentialProvider() hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
} }
} }
override def afterAll() { override def afterAll() {
if (hdfsCredentialProvider != null) { if (hadoopFsCredentialProvider != null) {
hdfsCredentialProvider = null hadoopFsCredentialProvider = null
} }
super.afterAll() super.afterAll()
...@@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite ...@@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration() val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf) val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM") renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
} }
...@@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite ...@@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite
val hadoopConf = new Configuration() val hadoopConf = new Configuration()
val caught = val caught =
intercept[SparkException] { intercept[SparkException] {
getTokenRenewer(hdfsCredentialProvider, hadoopConf) getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
} }
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment