Skip to content
Snippets Groups Projects
Commit 5c3912e5 authored by huangzhaowei's avatar huangzhaowei Committed by Tom Graves
Browse files

[SPARK-12523][YARN] Support long-running of the Spark On HBase and hive meta store.

Obtain the hive metastore and hbase token as well as hdfs token in DelegationToeknRenewer to supoort long-running application of spark on hbase or thriftserver.

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #10645 from SaintBacchus/SPARK-12523.
parent 318bf411
No related branches found
No related tags found
No related merge requests found
......@@ -172,6 +172,8 @@ private[yarn] class AMDelegationTokenRenewer(
override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds)
hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds)
null
}
})
......
......@@ -345,8 +345,8 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
obtainTokenForHBase(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
......@@ -1357,35 +1357,6 @@ object Client extends Logging {
}
}
/**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials) {
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
credentials.addToken(new Text("hive.server2.delegation.token"), _)
}
}
}
/**
* Obtain a security token for HBase.
*/
def obtainTokenForHBase(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials): Unit = {
if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
credentials.addToken(token.getService, token)
logInfo("Added HBase security token to credentials.")
}
}
}
/**
* Return whether the two file systems are the same.
*/
......@@ -1450,13 +1421,4 @@ object Client extends Logging {
components.mkString(Path.SEPARATOR)
}
/**
* Return whether delegation tokens should be retrieved for the given service when security is
* enabled. By default, tokens are retrieved, but that behavior can be changed by setting
* a service-specific configuration.
*/
def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
}
}
......@@ -133,6 +133,44 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
}
/**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
def obtainTokenForHiveMetastore(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials) {
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
credentials.addToken(new Text("hive.server2.delegation.token"), _)
}
}
}
/**
* Obtain a security token for HBase.
*/
def obtainTokenForHBase(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials): Unit = {
if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
credentials.addToken(token.getService, token)
logInfo("Added HBase security token to credentials.")
}
}
}
/**
* Return whether delegation tokens should be retrieved for the given service when security is
* enabled. By default, tokens are retrieved, but that behavior can be changed by setting
* a service-specific configuration.
*/
private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
}
private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
tokenRenewer.get.updateCredentialsIfRequired()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment