diff --git a/.gitignore b/.gitignore
index b3c4363af038e21e670ea5530a9ddfe0b8de8c69..399362f7d3e46c9e225c846659f850e71bb69c94 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
 *~
 *.swp
+*.ipr
 *.iml
+*.iws
 .idea/
 .settings
 .cache
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7c32e0ab9bcb5c41ecfad1688062d66abce86241..609e4e47e30488ac21437a3ba4b5980707029cb9 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,11 +42,12 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
-  private var rpc: YarnRPC = YarnRPC.create(conf)
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   private var appAttemptId: ApplicationAttemptId = _
   private var userThread: Thread = _
@@ -60,7 +61,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private var isLastAMRetry: Boolean = true
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  private val sparkConf = new SparkConf()
   // Default to numWorkers * 2, with minimum of 3
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a75066888c22f63ae374399cfbcb239abcdc5808..952171cd0ae15b598cdd92e7b57b732e396f8304 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,23 +50,23 @@ import org.apache.spark.deploy.SparkHadoopUtil
  * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
  * which will launch a Spark master process and negotiate resources throughout its duration.
  */
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+
+  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ClientArguments) = this(args, new SparkConf())
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
-
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short)
   // App files are world-wide readable and owner writable -> rw-r--r--
   val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644: Short)
 
-  def this(args: ClientArguments) = this(new Configuration(), args)
-
   def runApp(): ApplicationId = {
     validateArgs()
     // Initialize and start the client service.
@@ -326,7 +326,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
 
@@ -482,10 +482,10 @@ object Client {
     // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
     // see Client#setupLaunchEnv().
     System.setProperty("SPARK_YARN_MODE", "true")
+    val sparkConf = new SparkConf()
+    val args = new ClientArguments(argStrings, sparkConf)
 
-    val args = new ClientArguments(argStrings)
-
-    (new Client(args)).run()
+    new Client(args, sparkConf).run()
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -495,7 +495,7 @@ object Client {
     }
   }
 
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
@@ -503,7 +503,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false")
+    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 7aac2328dad60794c35142a64fc18b87a5a97973..1419f215c78e5831e7134adbb6e10d5fb6420960 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.MemoryParam
 
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -36,7 +36,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024 // MB
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = new SparkConf().get("QUEUE", "default")
+  var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 99b824e1295a7d87b9e7753eb353eaf32c1f2673..09ac8d77ca0b76625599efe761d15e103fdfc57b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
   private var appAttemptId: ApplicationAttemptId = _
   private var reporterThread: Thread = _
@@ -47,7 +49,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
   private var driverClosed:Boolean = false
 
   private var amClient: AMRMClient[ContainerRequest] = _
-  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 9f5523c4b97a8811c14a19f9e34c6cd1e32028e7..b7699050bbaeb0f8dbf63b2f9a942eb3b1ab1bab 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -39,12 +39,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 
 class WorkerRunnable(
     container: Container,
     conf: Configuration,
+    sparkConf: SparkConf,
     masterAddress: String,
     slaveId: String,
     hostname: String,
@@ -197,7 +198,7 @@ class WorkerRunnable(
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // Allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 85ab08ef34db4738f53f95d3d68dadde555df3cb..9fbc783d601269409186f0a7cc4bc86365a6b32e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -280,6 +280,7 @@ private[yarn] class YarnAllocationHandler(
           val workerRunnable = new WorkerRunnable(
             container,
             conf,
+            sparkConf,
             driverUrl,
             workerId,
             workerHostname,
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f5078b0ab10818f1d6ecd5d9655cd00327c8..324ef4616fe268b4005a2ec014fb0a8636d4e2e2 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
       "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray)
-    client = new Client(args)
+    val args = new ClientArguments(argsArray, conf)
+    client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
   }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7cf120d3eb8a47a966f1263610b2678262f85652..69170c74277b44787bb71472fb46bfab1036309f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,11 +39,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
 
-  private var rpc: YarnRPC = YarnRPC.create(conf)
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+  private val rpc: YarnRPC = YarnRPC.create(conf)
   private var resourceManager: AMRMProtocol = _
   private var appAttemptId: ApplicationAttemptId = _
   private var userThread: Thread = _
@@ -57,7 +59,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
 
-  private val sparkConf = new SparkConf()
   // Default to numWorkers * 2, with minimum of 3
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2bd047c97adbab917c069c83dc2abc9844e421c6..525ea7276208bba8bddcdf1e2671eb80fad93ca0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,16 +45,17 @@ import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
 
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments) = this(new Configuration(), args)
+  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ClientArguments) = this(args, new SparkConf())
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
@@ -307,7 +308,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
 
@@ -466,9 +467,10 @@ object Client {
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
 
-    val args = new ClientArguments(argStrings)
+    val sparkConf =  new SparkConf
+    val args = new ClientArguments(argStrings,sparkConf)
 
-    new Client(args).run
+    new Client(args,sparkConf).run
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -478,7 +480,7 @@ object Client {
     }
   }
 
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
@@ -486,7 +488,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
+    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + APP_JAR)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9075ca71e7efc38fa83068e91ffa78db400b4ec7..09303ae5c25f8b456da5d42d3a3236b1264c83b1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
 import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 
 // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -34,7 +34,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = new SparkConf().get("QUEUE", "default")
+  var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index a8de89c67081187843d7af7dfa708dd6afa997e3..1a792ddf6669c6c96fd72a69d90cc5aff3a8087c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
   private val rpc: YarnRPC = YarnRPC.create(conf)
   private var resourceManager: AMRMProtocol = null
@@ -46,7 +48,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var driverClosed:Boolean = false
-  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6a90cc51cfbaf71ef5683703dd84fa6d5b872ccb..5e5d0421bacb40584a892a7dcf493ede900933cc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 
 class WorkerRunnable(
     container: Container,
     conf: Configuration,
+    sparkConf: SparkConf,
     masterAddress: String,
     slaveId: String,
     hostname: String,
@@ -200,7 +201,7 @@ class WorkerRunnable(
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // Allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c8af653b3ffab9191b586a015902b616e1952eff..e91257be8ed008585c0491fc0c1ce13a6258cd15 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler(
           }
 
           new Thread(
-            new WorkerRunnable(container, conf, driverUrl, workerId,
+            new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
               workerHostname, workerMemory, workerCores)
           ).start()
         }
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f5078b0ab10818f1d6ecd5d9655cd00327c8..324ef4616fe268b4005a2ec014fb0a8636d4e2e2 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
       "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray)
-    client = new Client(args)
+    val args = new ClientArguments(argsArray, conf)
+    client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
   }