diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index a0729757b7e1e2d2755e1d6c33b0c6724aa11646..f7513454c7852ff5b3f26d770bd55a1989e77b0b 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to the services and data.
 Clients must first acquire tokens for the services they will access and pass them along with their
 application as it is launched in the YARN cluster.
 
-For a Spark application to interact with HDFS, HBase and Hive, it must acquire the relevant tokens
+For a Spark application to interact with any of the Hadoop filesystem (for example hdfs, webhdfs, etc), HBase and Hive, it must acquire the relevant tokens
 using the Kerberos credentials of the user launching the application
 —that is, the principal whose identity will become that of the launched Spark application.
 
 This is normally done at launch time: in a secure cluster Spark will automatically obtain a
-token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
+token for the cluster's default Hadoop filesystem, and potentially for HBase and Hive.
 
 An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
 the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
@@ -494,12 +494,12 @@ Similarly, a Hive token will be obtained if Hive is on the classpath, its config
 includes a URI of the metadata store in `"hive.metastore.uris`, and
 `spark.yarn.security.credentials.hive.enabled` is not set to `false`.
 
-If an application needs to interact with other secure HDFS clusters, then
+If an application needs to interact with other secure Hadoop filesystems, then
 the tokens needed to access these clusters must be explicitly requested at
 launch time. This is done by listing them in the `spark.yarn.access.namenodes` property.
 
 ```
-spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
+spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/
 ```
 
 Spark supports integrating with other security-aware services through Java Services mechanism (see
@@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up for Oozie to request a
 the application needs, including:
 
 - The YARN resource manager.
-- The local HDFS filesystem.
-- Any remote HDFS filesystems used as a source or destination of I/O.
+- The local Hadoop filesystem.
+- Any remote Hadoop filesystems used as a source or destination of I/O.
 - Hive —if used.
 - HBase —if used.
 - The YARN timeline server, if the application interacts with this.
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index 22ead56d2345d766ffdd001e6b0cbba80ecf2004..f5a807ecac9d7d63e93d34573503676033dd9196 100644
--- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1,3 +1,3 @@
-org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
 org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
 org.apache.spark.deploy.yarn.security.HiveCredentialProvider
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
index 933736bd2271427e8c72b0177d6e30be192055bf..4f4be52a0d6915d63cabb075ac778a94941f60c6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
 /**
  * A ConfigurableCredentialManager to manage all the registered credential providers and offer
  * APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
+ * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
  * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
  * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
  * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
similarity index 67%
rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
rename to resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
index ebb176bc95caa831cf9e095214ef4f3b1a4344a0..b4fb4a790adc52b249585dbc96cc3cf5662da0e4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
@@ -17,37 +17,40 @@
 
 package org.apache.spark.deploy.yarn.security
 
-import java.io.{ByteArrayInputStream, DataInputStream}
-
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 
-private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
+private[security] class HadoopFSCredentialProvider
+    extends ServiceCredentialProvider with Logging {
   // Token renewal interval, this value will be set in the first call,
-  // if None means no token renewer specified, so cannot get token renewal interval.
+  // if None means no token renewer specified or no token can be renewed,
+  // so cannot get token renewal interval.
   private var tokenRenewalInterval: Option[Long] = null
 
-  override val serviceName: String = "hdfs"
+  override val serviceName: String = "hadoopfs"
 
   override def obtainCredentials(
       hadoopConf: Configuration,
       sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
     // NameNode to access, used to get tokens from different FileSystems
+    val tmpCreds = new Credentials()
+    val tokenRenewer = getTokenRenewer(hadoopConf)
     nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
       val dstFs = dst.getFileSystem(hadoopConf)
-      logInfo("getting token for namenode: " + dst)
-      dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+      logInfo("getting token for: " + dst)
+      dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
     }
 
     // Get the token renewal interval if it is not set. It will only be called once.
@@ -56,15 +59,18 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
     }
 
     // Get the time of next renewal.
-    tokenRenewalInterval.map { interval =>
-      creds.getAllTokens.asScala
-        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+    val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
+      val nextRenewalDates = tmpCreds.getAllTokens.asScala
+        .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
         .map { t =>
-          val identifier = new DelegationTokenIdentifier()
-          identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+          val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
           identifier.getIssueDate + interval
-      }.foldLeft(0L)(math.max)
+        }
+      if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
     }
+
+    creds.addAll(tmpCreds)
+    nextRenewalDate
   }
 
   private def getTokenRenewalInterval(
@@ -78,16 +84,19 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider
         val dstFs = dst.getFileSystem(hadoopConf)
         dstFs.addDelegationTokens(renewer, creds)
       }
-      val hdfsToken = creds.getAllTokens.asScala
-        .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      hdfsToken.map { t =>
-        val newExpiration = t.renew(hadoopConf)
-        val identifier = new DelegationTokenIdentifier()
-        identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
-        val interval = newExpiration - identifier.getIssueDate
-        logInfo(s"Renewal Interval is $interval")
-        interval
+
+      val renewIntervals = creds.getAllTokens.asScala.filter {
+        _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+      }.flatMap { token =>
+        Try {
+          val newExpiration = token.renew(hadoopConf)
+          val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+          val interval = newExpiration - identifier.getIssueDate
+          logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
+          interval
+        }.toOption
       }
+      if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
     }
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
index db4619e80c8e48115874cdbaa7ec77e013d34799..b0067aa4517c7cd322a7c4b363ba42ec08eb004f 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
   test("Correctly load default credential providers") {
     credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
     credentialManager.getServiceCredentialProvider("hbase") should not be (None)
     credentialManager.getServiceCredentialProvider("hive") should not be (None)
   }
@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
     sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
     credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
     credentialManager.getServiceCredentialProvider("hbase") should not be (None)
     credentialManager.getServiceCredentialProvider("hive") should be (None)
   }
 
   test("using deprecated configurations") {
-    sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+    sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
     sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
     credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
     credentialManager.getServiceCredentialProvider("hive") should be (None)
     credentialManager.getServiceCredentialProvider("test") should not be (None)
     credentialManager.getServiceCredentialProvider("hbase") should not be (None)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
similarity index 75%
rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
rename to resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
index 7b2da3f26e343a241e9c0aab3a0489acc49d2043..0eb25127238c65d8b87bf0f439d92d7d8a52c2ee 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
@@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester}
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 
-class HDFSCredentialProviderSuite
+class HadoopFSCredentialProviderSuite
     extends SparkFunSuite
     with PrivateMethodTester
     with Matchers {
   private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
 
   private def getTokenRenewer(
-      hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
-    hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+      fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
+    fsCredentialProvider invokePrivate _getTokenRenewer(conf)
   }
 
-  private var hdfsCredentialProvider: HDFSCredentialProvider = null
+  private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
 
   override def beforeAll() {
     super.beforeAll()
 
-    if (hdfsCredentialProvider == null) {
-      hdfsCredentialProvider = new HDFSCredentialProvider()
+    if (hadoopFsCredentialProvider == null) {
+      hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
     }
   }
 
   override def afterAll() {
-    if (hdfsCredentialProvider != null) {
-      hdfsCredentialProvider = null
+    if (hadoopFsCredentialProvider != null) {
+      hadoopFsCredentialProvider = null
     }
 
     super.afterAll()
@@ -56,7 +56,7 @@ class HDFSCredentialProviderSuite
     val hadoopConf = new Configuration()
     hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
     hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
-    val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+    val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
     renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
   }
 
@@ -64,7 +64,7 @@ class HDFSCredentialProviderSuite
     val hadoopConf = new Configuration()
     val caught =
       intercept[SparkException] {
-        getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+        getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
       }
     assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
   }