Skip to content
Snippets Groups Projects
Commit 53465075 authored by jerryshao's avatar jerryshao Committed by Marcelo Vanzin
Browse files

[SPARK-21377][YARN] Make jars specify with --jars/--packages load-able in AM's credential renwer

## What changes were proposed in this pull request?

In this issue we have a long running Spark application with secure HBase, which requires `HBaseCredentialProvider` to get tokens periodically, we specify HBase related jars with `--packages`, but these dependencies are not added into AM classpath, so when `HBaseCredentialProvider` tries to initialize HBase connections to get tokens, it will be failed.

Currently because jars specified with `--jars` or `--packages` are not added into AM classpath, the only way to extend AM classpath is to use "spark.driver.extraClassPath" which supposed to be used in yarn cluster mode.

So in this fix, we proposed to use/reuse a classloader for `AMCredentialRenewer` to acquire new tokens.

Also in this patch, we fixed AM cannot get tokens from HDFS issue, it is because FileSystem is gotten before kerberos logged, so using this FS to get tokens will throw exception.

## How was this patch tested?

Manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #18616 from jerryshao/SPARK-21377.
parent 0e07a29c
No related branches found
No related tags found
No related merge requests found
...@@ -90,6 +90,23 @@ private[spark] class ApplicationMaster( ...@@ -90,6 +90,23 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _ @volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _ @volatile private var allocator: YarnAllocator = _
private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}
// Lock for controlling the allocator (heartbeat) thread. // Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object() private val allocatorLock = new Object()
...@@ -242,16 +259,27 @@ private[spark] class ApplicationMaster( ...@@ -242,16 +259,27 @@ private[spark] class ApplicationMaster(
// If the credentials file config is present, we must periodically renew tokens. So create // If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer // a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
// If a principal and keytab have been set, use that to create new credentials for executors // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
// periodically // classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
val credentialManager = new YARNHadoopDelegationTokenManager( val credentialRenewerThread = new Thread {
sparkConf, setName("AMCredentialRenewerStarter")
yarnConf, setContextClassLoader(userClassLoader)
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
override def run(): Unit = {
val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) val credentialManager = new YARNHadoopDelegationTokenManager(
credentialRenewer.scheduleLoginFromKeytab() sparkConf,
yarnConf,
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
val credentialRenewer =
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
}
}
credentialRenewerThread.start()
credentialRenewerThread.join()
} }
if (isClusterMode) { if (isClusterMode) {
...@@ -609,17 +637,6 @@ private[spark] class ApplicationMaster( ...@@ -609,17 +637,6 @@ private[spark] class ApplicationMaster(
private def startUserApplication(): Thread = { private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread") logInfo("Starting the user application in a separate Thread")
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val userClassLoader =
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
var userArgs = args.userArgs var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list // When running pyspark, the app is run using PythonRunner. The second argument is the list
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment