diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a00234c2b416cd8c6fe7f370704b7556e298679f..fa99cd3b64a4dd08a4e048473d38fb0b55d1bb6e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -100,6 +100,7 @@ private[spark] class Client(
   private var principal: String = null
   private var keytab: String = null
   private var credentials: Credentials = null
+  private var amKeytabFileName: String = null
 
   private val launcherBackend = new LauncherBackend() {
     override def onStopRequest(): Unit = {
@@ -471,7 +472,7 @@ private[spark] class Client(
       logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
         " via the YARN Secure Distributed Cache.")
       val (_, localizedPath) = distribute(keytab,
-        destName = sparkConf.get(KEYTAB),
+        destName = Some(amKeytabFileName),
         appMasterOnly = true)
       require(localizedPath != null, "Keytab file already distributed.")
     }
@@ -708,6 +709,9 @@ private[spark] class Client(
       // Save Spark configuration to a file in the archive.
       val props = new Properties()
       sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      // Override spark.yarn.key to point to the location in distributed cache which will be used
+      // by AM.
+      Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }
       confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
       val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8)
       props.store(writer, "Spark configuration.")
@@ -995,8 +999,7 @@ private[spark] class Client(
       val f = new File(keytab)
       // Generate a file name that can be used for the keytab file, that does not conflict
       // with any user file.
-      val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      sparkConf.set(KEYTAB.key, keytabFileName)
+      amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
       sparkConf.set(PRINCIPAL.key, principal)
     }
     // Defensive copy of the credentials
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index dc9c3ff33542dc73a4a21c37cef69ef101777874..24dfd33bc36821fe76083061e138094c9b9481eb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -106,10 +106,6 @@ private[hive] class HiveClientImpl(
 
     // Set up kerberos credentials for UserGroupInformation.loginUser within
     // current class loader
-    // Instead of using the spark conf of the current spark context, a new
-    // instance of SparkConf is needed for the original value of spark.yarn.keytab
-    // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
-    // keytab configuration for the link name in distributed cache
     if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
       val principalName = sparkConf.get("spark.yarn.principal")
       val keytabFileName = sparkConf.get("spark.yarn.keytab")