From 646366b5d2f12e42f8e7287672ba29a8c918a17d Mon Sep 17 00:00:00 2001
From: huangzhaowei <carlmartinmax@gmail.com>
Date: Wed, 1 Jul 2015 23:01:44 -0700
Subject: [PATCH] [SPARK-8688] [YARN] Bug fix: disable the cache fs to gain the
 HDFS connection.

If `fs.hdfs.impl.disable.cache` was `false`(default), `FileSystem` will use the cached `DFSClient` which use old token.
[AMDelegationTokenRenewer](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala#L196)
```scala
    val credentials = UserGroupInformation.getCurrentUser.getCredentials
    credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration)
```
Although the `credentials` had the new Token, but it still use the cached client and old token.
So It's better to set the `fs.hdfs.impl.disable.cache`  as `true` to avoid token expired.

[Jira](https://issues.apache.org/jira/browse/SPARK-8688)

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #7069 from SaintBacchus/SPARK-8688 and squashes the following commits:

f94cd0b [huangzhaowei] modify function parameter
8fb9eb9 [huangzhaowei] explicit  the comment
0cd55c9 [huangzhaowei] Rename function name to be an accurate one
cf776a1 [huangzhaowei] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection.
---
 .../org/apache/spark/deploy/SparkHadoopUtil.scala   | 13 +++++++++++++
 .../deploy/yarn/AMDelegationTokenRenewer.scala      | 10 ++++++----
 .../yarn/ExecutorDelegationTokenUpdater.scala       |  5 ++++-
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 7fa75ac8c2..6d14590a1d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -334,6 +334,19 @@ class SparkHadoopUtil extends Logging {
    * Stop the thread that does the delegation token updates.
    */
   private[spark] def stopExecutorDelegationTokenRenewer() {}
+
+  /**
+   * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
+   * This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
+   */
+  private[spark] def getConfBypassingFSCache(
+      hadoopConf: Configuration,
+      scheme: String): Configuration = {
+    val newConf = new Configuration(hadoopConf)
+    val confKey = s"fs.${scheme}.impl.disable.cache"
+    newConf.setBoolean(confKey, true)
+    newConf
+  }
 }
 
 object SparkHadoopUtil {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 77af46c192..56e4741b93 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer(
     sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
   private val numFilesToKeep =
     sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+  private val freshHadoopConf =
+    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
 
   /**
    * Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer(
   private def cleanupOldFiles(): Unit = {
     import scala.concurrent.duration._
     try {
-      val remoteFs = FileSystem.get(hadoopConf)
+      val remoteFs = FileSystem.get(freshHadoopConf)
       val credentialsPath = new Path(credentialsFile)
       val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
       hadoopUtil.listFilesSorted(
@@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer(
       // Get a copy of the credentials
       override def run(): Void = {
         val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
-        hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
+        hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
         null
       }
     })
     // Add the temp credentials back to the original ones.
     UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    val remoteFs = FileSystem.get(hadoopConf)
+    val remoteFs = FileSystem.get(freshHadoopConf)
     // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
     // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
     // and update the lastCredentialsFileSuffix.
@@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer(
     val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
     logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
     val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
+    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
     logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
     remoteFs.rename(tempTokenPath, tokenPath)
     logInfo("Delegation token file rename complete.")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 229c2c4d5e..94feb6393f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -35,6 +35,9 @@ private[spark] class ExecutorDelegationTokenUpdater(
   @volatile private var lastCredentialsFileSuffix = 0
 
   private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+  private val freshHadoopConf =
+    SparkHadoopUtil.get.getConfBypassingFSCache(
+      hadoopConf, new Path(credentialsFile).toUri.getScheme)
 
   private val delegationTokenRenewer =
     Executors.newSingleThreadScheduledExecutor(
@@ -49,7 +52,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
   def updateCredentialsIfRequired(): Unit = {
     try {
       val credentialsFilePath = new Path(credentialsFile)
-      val remoteFs = FileSystem.get(hadoopConf)
+      val remoteFs = FileSystem.get(freshHadoopConf)
       SparkHadoopUtil.get.listFilesSorted(
         remoteFs, credentialsFilePath.getParent,
         credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-- 
GitLab