diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 9b3f29970e8ad01691a6a816adf64d9f98fb26a3..faf8a2b77ef730475fb8ddce685ca288913be93b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -105,10 +105,6 @@ private[hive] class HiveClientImpl(
 
     // Set up kerberos credentials for UserGroupInformation.loginUser within
     // current class loader
-    // Instead of using the spark conf of the current spark context, a new
-    // instance of SparkConf is needed for the original value of spark.yarn.keytab
-    // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
-    // keytab configuration for the link name in distributed cache
     if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
       val principalName = sparkConf.get("spark.yarn.principal")
       val keytabFileName = sparkConf.get("spark.yarn.keytab")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8a0c3f2536d83055c3b34f2d61b0b2195a275fe1..5280c420b988722ca3b97013a6242325beace242 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -101,6 +101,7 @@ private[spark] class Client(
   private var principal: String = null
   private var keytab: String = null
   private var credentials: Credentials = null
+  private var amKeytabFileName: String = null
 
   private val launcherBackend = new LauncherBackend() {
     override def onStopRequest(): Unit = {
@@ -503,7 +504,7 @@ private[spark] class Client(
       logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
         " via the YARN Secure Distributed Cache.")
       val (_, localizedPath) = distribute(keytab,
-        destName = sparkConf.get(KEYTAB),
+        destName = Some(amKeytabFileName),
         appMasterOnly = true)
       require(localizedPath != null, "Keytab file already distributed.")
     }
@@ -740,6 +741,9 @@ private[spark] class Client(
       // Save Spark configuration to a file in the archive.
       val props = new Properties()
       sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      // Override spark.yarn.key to point to the location in distributed cache which will be used
+      // by AM.
+      Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }
       confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
       val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8)
       props.store(writer, "Spark configuration.")
@@ -1036,8 +1040,7 @@ private[spark] class Client(
       val f = new File(keytab)
       // Generate a file name that can be used for the keytab file, that does not conflict
       // with any user file.
-      val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      sparkConf.set(KEYTAB.key, keytabFileName)
+      amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
       sparkConf.set(PRINCIPAL.key, principal)
     }
     // Defensive copy of the credentials