diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c1effd3c8a718b331ae21ceef22ccf9b36a83ec8..1091ff54b04637112fa3eee41b791a5d6f6ce9cd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -22,17 +22,21 @@ import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
+import scala.reflect.runtime.universe
 import scala.util.{Try, Success, Failure}
 
 import com.google.common.base.Objects
 
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -220,6 +224,7 @@ private[spark] class Client(
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
     val nns = getNameNodesToAccess(sparkConf) + dst
     obtainTokensForNamenodes(nns, hadoopConf, credentials)
+    obtainTokenForHiveMetastore(hadoopConf, credentials)
 
     val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
       fs.getDefaultReplication(dst)).toShort
@@ -936,6 +941,64 @@ object Client extends Logging {
     }
   }
 
+  /**
+   * Obtains token for the Hive metastore and adds them to the credentials.
+   */
+  private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
+    if (UserGroupInformation.isSecurityEnabled) {
+      val mirror = universe.runtimeMirror(getClass.getClassLoader)
+
+      try {
+        val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+        val hive = hiveClass.getMethod("get").invoke(null)
+
+        val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
+        val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+
+        val hiveConfGet = (param:String) => Option(hiveConfClass
+          .getMethod("get", classOf[java.lang.String])
+          .invoke(hiveConf, param))
+
+        val metastore_uri = hiveConfGet("hive.metastore.uris")
+
+        // Check for local metastore
+        if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
+          val metastore_kerberos_principal_conf_var = mirror.classLoader
+            .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
+            .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
+
+          val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
+
+          val username = Option(UserGroupInformation.getCurrentUser().getUserName)
+          if (principal != None && username != None) {
+            val tokenStr = hiveClass.getMethod("getDelegationToken",
+              classOf[java.lang.String], classOf[java.lang.String])
+              .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
+
+            val hive2Token = new Token[DelegationTokenIdentifier]()
+            hive2Token.decodeFromUrlString(tokenStr)
+            credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
+            logDebug("Added hive.Server2.delegation.token to conf.")
+            hiveClass.getMethod("closeCurrent").invoke(null)
+          } else {
+            logError("Username or principal == NULL")
+            logError(s"""username=${username.getOrElse("(NULL)")}""")
+            logError(s"""principal=${principal.getOrElse("(NULL)")}""")
+            throw new IllegalArgumentException("username and/or principal is equal to null!")
+          }
+        } else {
+          logDebug("HiveMetaStore configured in localmode")
+        }
+      } catch {
+        case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
+        case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
+        case e:Exception => { logError("Unexpected Exception " + e)
+          throw new RuntimeException("Unexpected exception", e)
+        }
+      }
+    }
+  }
+
   /**
    * Return whether the two file systems are the same.
    */