diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 915ef81b4eae384dd867a27a15c0d5bdd187bc91..175756b80b6bbbd396776eff31b11a31a697ce80 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -255,6 +255,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
           "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
       }
     }
+
+    if (proxyUser != null && principal != null) {
+      SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
+    }
   }
 
   private def validateKillArguments(): Unit = {
@@ -517,6 +521,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         |  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
         |
         |  --proxy-user NAME           User to impersonate when submitting the application.
+        |                              This argument does not work with --principal / --keytab.
         |
         |  --help, -h                  Show this help message and exit
         |  --verbose, -v               Print additional debug output
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4c9432dbd6ab0e982e4e9f34e2442987690483ca..aef78fdfd4c570641ecf88742cba0a7f57879ff3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
+import java.lang.reflect.UndeclaredThrowableException
 import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
 import java.util.regex.Matcher
 import java.util.regex.Pattern
 
@@ -194,7 +196,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
    */
   def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
     try {
-      obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
+      obtainTokenForHiveMetastoreInner(conf)
     } catch {
       case e: ClassNotFoundException =>
         logInfo(s"Hive class not found $e")
@@ -209,8 +211,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
    * @param username the username of the principal requesting the delegating token.
    * @return a delegation token
    */
-  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
-      username: String): Option[Token[DelegationTokenIdentifier]] = {
+  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
+      Option[Token[DelegationTokenIdentifier]] = {
     val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
 
     // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -225,11 +227,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
     // Check for local metastore
     if (metastoreUri.nonEmpty) {
-      require(username.nonEmpty, "Username undefined")
       val principalKey = "hive.metastore.kerberos.principal"
       val principal = hiveConf.getTrimmed(principalKey, "")
       require(principal.nonEmpty, "Hive principal $principalKey undefined")
-      logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
+      val currentUser = UserGroupInformation.getCurrentUser()
+      logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
+        s"$principal at $metastoreUri")
       val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
       val closeCurrent = hiveClass.getMethod("closeCurrent")
       try {
@@ -238,12 +241,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
           classOf[String], classOf[String])
         val getHive = hiveClass.getMethod("get", hiveConfClass)
 
-        // invoke
-        val hive = getHive.invoke(null, hiveConf)
-        val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
-        val hive2Token = new Token[DelegationTokenIdentifier]()
-        hive2Token.decodeFromUrlString(tokenStr)
-        Some(hive2Token)
+        doAsRealUser {
+          val hive = getHive.invoke(null, hiveConf)
+          val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
+            .asInstanceOf[String]
+          val hive2Token = new Token[DelegationTokenIdentifier]()
+          hive2Token.decodeFromUrlString(tokenStr)
+          Some(hive2Token)
+        }
       } finally {
         Utils.tryLogNonFatalError {
           closeCurrent.invoke(null)
@@ -303,6 +308,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     }
   }
 
+  /**
+   * Run some code as the real logged in user (which may differ from the current user, for
+   * example, when using proxying).
+   */
+  private def doAsRealUser[T](fn: => T): T = {
+    val currentUser = UserGroupInformation.getCurrentUser()
+    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+   // For some reason the Scala-generated anonymous class ends up causing an
+   // UndeclaredThrowableException, even if you annotate the method with @throws.
+   try {
+      realUser.doAs(new PrivilegedExceptionAction[T]() {
+        override def run(): T = fn
+      })
+    } catch {
+      case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
+    }
+  }
+
 }
 
 object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index d3acaf229cc856e7ebd1a14d731f2c59f8e63676..9202bd892f01bd5828d905aff841536517490456 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
     hadoopConf.set("hive.metastore.uris", "http://localhost:0")
     val util = new YarnSparkHadoopUtil
     assertNestedHiveException(intercept[InvocationTargetException] {
-      util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+      util.obtainTokenForHiveMetastoreInner(hadoopConf)
     })
     assertNestedHiveException(intercept[InvocationTargetException] {
       util.obtainTokenForHiveMetastore(hadoopConf)