Skip to content
Snippets Groups Projects
Commit 646366b5 authored by huangzhaowei's avatar huangzhaowei Committed by Andrew Or
Browse files

[SPARK-8688] [YARN] Bug fix: disable the cache fs to gain the HDFS connection.

If `fs.hdfs.impl.disable.cache` was `false`(default), `FileSystem` will use the cached `DFSClient` which use old token.
[AMDelegationTokenRenewer](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala#L196)
```scala
    val credentials = UserGroupInformation.getCurrentUser.getCredentials
    credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration)
```
Although the `credentials` had the new Token, but it still use the cached client and old token.
So It's better to set the `fs.hdfs.impl.disable.cache`  as `true` to avoid token expired.

[Jira](https://issues.apache.org/jira/browse/SPARK-8688)

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #7069 from SaintBacchus/SPARK-8688 and squashes the following commits:

f94cd0b [huangzhaowei] modify function parameter
8fb9eb9 [huangzhaowei] explicit  the comment
0cd55c9 [huangzhaowei] Rename function name to be an accurate one
cf776a1 [huangzhaowei] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection.
parent 792fcd80
No related branches found
No related tags found
No related merge requests found
...@@ -334,6 +334,19 @@ class SparkHadoopUtil extends Logging { ...@@ -334,6 +334,19 @@ class SparkHadoopUtil extends Logging {
* Stop the thread that does the delegation token updates. * Stop the thread that does the delegation token updates.
*/ */
private[spark] def stopExecutorDelegationTokenRenewer() {} private[spark] def stopExecutorDelegationTokenRenewer() {}
/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
* This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
*/
private[spark] def getConfBypassingFSCache(
hadoopConf: Configuration,
scheme: String): Configuration = {
val newConf = new Configuration(hadoopConf)
val confKey = s"fs.${scheme}.impl.disable.cache"
newConf.setBoolean(confKey, true)
newConf
}
} }
object SparkHadoopUtil { object SparkHadoopUtil {
......
...@@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer( ...@@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer(
sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5) sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
private val numFilesToKeep = private val numFilesToKeep =
sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5) sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
private val freshHadoopConf =
hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
/** /**
* Schedule a login from the keytab and principal set using the --principal and --keytab * Schedule a login from the keytab and principal set using the --principal and --keytab
...@@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer( ...@@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer(
private def cleanupOldFiles(): Unit = { private def cleanupOldFiles(): Unit = {
import scala.concurrent.duration._ import scala.concurrent.duration._
try { try {
val remoteFs = FileSystem.get(hadoopConf) val remoteFs = FileSystem.get(freshHadoopConf)
val credentialsPath = new Path(credentialsFile) val credentialsPath = new Path(credentialsFile)
val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
hadoopUtil.listFilesSorted( hadoopUtil.listFilesSorted(
...@@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer( ...@@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer(
// Get a copy of the credentials // Get a copy of the credentials
override def run(): Void = { override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds) hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
null null
} }
}) })
// Add the temp credentials back to the original ones. // Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds) UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
val remoteFs = FileSystem.get(hadoopConf) val remoteFs = FileSystem.get(freshHadoopConf)
// If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
// was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
// and update the lastCredentialsFileSuffix. // and update the lastCredentialsFileSuffix.
...@@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer( ...@@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer(
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
logInfo("Writing out delegation tokens to " + tempTokenPath.toString) logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
val credentials = UserGroupInformation.getCurrentUser.getCredentials val credentials = UserGroupInformation.getCurrentUser.getCredentials
credentials.writeTokenStorageFile(tempTokenPath, hadoopConf) credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
remoteFs.rename(tempTokenPath, tokenPath) remoteFs.rename(tempTokenPath, tokenPath)
logInfo("Delegation token file rename complete.") logInfo("Delegation token file rename complete.")
......
...@@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater( ...@@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater(
@volatile private var lastCredentialsFileSuffix = 0 @volatile private var lastCredentialsFileSuffix = 0
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
private val freshHadoopConf =
SparkHadoopUtil.get.getConfBypassingFSCache(
hadoopConf, new Path(credentialsFile).toUri.getScheme)
private val delegationTokenRenewer = private val delegationTokenRenewer =
Executors.newSingleThreadScheduledExecutor( Executors.newSingleThreadScheduledExecutor(
...@@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater( ...@@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
def updateCredentialsIfRequired(): Unit = { def updateCredentialsIfRequired(): Unit = {
try { try {
val credentialsFilePath = new Path(credentialsFile) val credentialsFilePath = new Path(credentialsFile)
val remoteFs = FileSystem.get(hadoopConf) val remoteFs = FileSystem.get(freshHadoopConf)
SparkHadoopUtil.get.listFilesSorted( SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent, remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment