diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4049fc0c41c533b7149b53a7c978a620f6e6ee47..926e1ff7a874d10d93784670680267cf90262868 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -441,7 +441,6 @@ object SparkSubmit {
       OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         sysProp = "spark.submit.deployMode"),
       OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
-      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
       OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
       OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
         sysProp = "spark.driver.memory"),
@@ -452,27 +451,15 @@ object SparkSubmit {
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         sysProp = "spark.driver.extraLibraryPath"),
 
-      // Yarn client only
-      OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
+      // Yarn only
+      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.instances"),
-      OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
-      OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
-      OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
-      OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
-
-      // Yarn cluster only
-      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
-      OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
-      OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
-      OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
-      OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
-      OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
-      OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
-      OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
-      OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
-      OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"),
-      OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
+      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
+      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
+      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
+      OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
+      OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
 
       // Other options
       OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
@@ -483,10 +470,11 @@ object SparkSubmit {
         sysProp = "spark.cores.max"),
       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
         sysProp = "spark.files"),
-      OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
-      OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
+      OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
+      OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
+      OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
         sysProp = "spark.driver.memory"),
-      OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
+      OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
         sysProp = "spark.driver.cores"),
       OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
         sysProp = "spark.driver.supervise"),
@@ -550,6 +538,10 @@ object SparkSubmit {
       if (args.isPython) {
         sysProps.put("spark.yarn.isPython", "true")
       }
+
+      if (args.pyFiles != null) {
+        sysProps("spark.submit.pyFiles") = args.pyFiles
+      }
     }
 
     // assure a keytab is available from any place in a JVM
@@ -576,9 +568,6 @@ object SparkSubmit {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
       if (args.isPython) {
         childArgs += ("--primary-py-file", args.primaryResource)
-        if (args.pyFiles != null) {
-          childArgs += ("--py-files", args.pyFiles)
-        }
         childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
       } else if (args.isR) {
         val mainFile = new Path(args.primaryResource).getName
@@ -627,7 +616,8 @@ object SparkSubmit {
       "spark.jars",
       "spark.files",
       "spark.yarn.dist.files",
-      "spark.yarn.dist.archives")
+      "spark.yarn.dist.archives",
+      "spark.yarn.dist.jars")
     pathConfigs.foreach { config =>
       // Replace old URIs with resolved URIs, if they exist
       sysProps.get(config).foreach { oldValue =>
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f2f20b32075774fee4b9b21be0f1ea4aa0816fce..968c5192ac676042a7574477c9a8bd11942d4835 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.internal
 
 import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.ByteUnit
 
 package object config {
 
@@ -33,6 +34,10 @@ package object config {
   private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
     ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
 
+  private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
+    .bytesConf(ByteUnit.MiB)
+    .withDefaultString("1g")
+
   private[spark] val EXECUTOR_CLASS_PATH =
     ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
 
@@ -45,6 +50,10 @@ package object config {
   private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
     ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
 
+  private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
+    .bytesConf(ByteUnit.MiB)
+    .withDefaultString("1g")
+
   private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
     .booleanConf.withDefault(false)
 
@@ -73,4 +82,9 @@ package object config {
 
   private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
 
+  private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
+    .internal
+    .stringConf
+    .toSequence
+    .withDefault(Nil)
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 96cb4fd0ebeef3727eec787d5ea7b33abb195f09..271897699201b967358d46cd8c945d402bbf335e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -199,21 +199,21 @@ class SparkSubmitSuite
     val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
     val childArgsStr = childArgs.mkString(" ")
     childArgsStr should include ("--class org.SomeClass")
-    childArgsStr should include ("--executor-memory 5g")
-    childArgsStr should include ("--driver-memory 4g")
-    childArgsStr should include ("--executor-cores 5")
     childArgsStr should include ("--arg arg1 --arg arg2")
-    childArgsStr should include ("--queue thequeue")
     childArgsStr should include regex ("--jar .*thejar.jar")
-    childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
-    childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
-    childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
     classpath should have length (0)
+
+    sysProps("spark.executor.memory") should be ("5g")
+    sysProps("spark.driver.memory") should be ("4g")
+    sysProps("spark.executor.cores") should be ("5")
+    sysProps("spark.yarn.queue") should be ("thequeue")
+    sysProps("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar")
+    sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
+    sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
     sysProps("spark.app.name") should be ("beauty")
     sysProps("spark.ui.enabled") should be ("false")
     sysProps("SPARK_SUBMIT") should be ("true")
-    sysProps.keys should not contain ("spark.jars")
   }
 
   test("handles YARN client mode") {
@@ -249,7 +249,8 @@ class SparkSubmitSuite
     sysProps("spark.executor.instances") should be ("6")
     sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
     sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
-    sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
+    sysProps("spark.yarn.dist.jars") should include
+      regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
     sysProps("SPARK_SUBMIT") should be ("true")
     sysProps("spark.ui.enabled") should be ("false")
   }
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index c775fe710ffd593de1ff7e69aef27533164c467c..bb83272ec80e47f34e728ae08c0f7758fc97b3f9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -215,6 +215,13 @@ If you need a reference to the proper location to put log files in the YARN so t
     Comma-separated list of files to be placed in the working directory of each executor.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.dist.jars</code></td>
+  <td>(none)</td>
+  <td>
+    Comma-separated list of jars to be placed in the working directory of each executor.
+  </td>
+</tr>
 <tr>
   <td><code>spark.executor.cores</code></td>
   <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e941089d1b0966da92be0a540455d2967a7aa6aa..9e8453429c9ba2f6170f02ad548717148f404ca5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -662,7 +662,7 @@ object ApplicationMaster extends Logging {
     SignalLogger.register(log)
     val amArgs = new ApplicationMasterArguments(args)
     SparkHadoopUtil.get.runAsSparkUser { () =>
-      master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
+      master = new ApplicationMaster(amArgs, new YarnRMClient)
       System.exit(master.run())
     }
   }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 6987e5a55fc321df34556dab57cb207e5f8de2c9..5cdec87667a5dea7ea3527cc28f1c4d32103c5f3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var primaryPyFile: String = null
   var primaryRFile: String = null
   var userArgs: Seq[String] = Nil
-  var executorMemory = 1024
-  var executorCores = 1
   var propertiesFile: String = null
 
   parseArgs(args.toList)
@@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
           primaryRFile = value
           args = tail
 
-        case ("--args" | "--arg") :: value :: tail =>
+        case ("--arg") :: value :: tail =>
           userArgsBuffer += value
           args = tail
 
-        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
-          executorMemory = value
-          args = tail
-
-        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
-          executorCores = value
-          args = tail
-
         case ("--properties-file") :: value :: tail =>
           propertiesFile = value
           args = tail
@@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
       |  --class CLASS_NAME   Name of your application's main class
       |  --primary-py-file    A main Python file
       |  --primary-r-file     A main R file
-      |  --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
-      |                       place on the PYTHONPATH for Python apps.
-      |  --args ARGS          Arguments to be passed to your application's main class.
+      |  --arg ARG            Argument to be passed to your application's main class.
       |                       Multiple invocations are possible, each will be passed in order.
-      |  --executor-cores NUM   Number of cores for the executors (Default: 1)
-      |  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
       |  --properties-file FILE Path to a custom Spark properties file.
       """.stripMargin)
     // scalastyle:on println
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f0f13a16e091ddfe04eee33c1b6dcc2337fedaf5..4dd3ccdf3723d1c3dc972de1fb634d3390b81d76 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -64,21 +64,44 @@ private[spark] class Client(
   extends Logging {
 
   import Client._
+  import YarnSparkHadoopUtil._
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
     this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
 
   private val yarnClient = YarnClient.createYarnClient
   private val yarnConf = new YarnConfiguration(hadoopConf)
-  private var credentials: Credentials = null
-  private val amMemoryOverhead = args.amMemoryOverhead // MB
-  private val executorMemoryOverhead = args.executorMemoryOverhead // MB
+
+  private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+
+  // AM related configurations
+  private val amMemory = if (isClusterMode) {
+    sparkConf.get(DRIVER_MEMORY).toInt
+  } else {
+    sparkConf.get(AM_MEMORY).toInt
+  }
+  private val amMemoryOverhead = {
+    val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+    sparkConf.get(amMemoryOverheadEntry).getOrElse(
+      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+  }
+  private val amCores = if (isClusterMode) {
+    sparkConf.get(DRIVER_CORES)
+  } else {
+    sparkConf.get(AM_CORES)
+  }
+
+  // Executor related configurations
+  private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+
   private val distCacheMgr = new ClientDistributedCacheManager()
-  private val isClusterMode = args.isClusterMode
 
   private var loginFromKeytab = false
   private var principal: String = null
   private var keytab: String = null
+  private var credentials: Credentials = null
 
   private val launcherBackend = new LauncherBackend() {
     override def onStopRequest(): Unit = {
@@ -179,8 +202,8 @@ private[spark] class Client(
       newApp: YarnClientApplication,
       containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
     val appContext = newApp.getApplicationSubmissionContext
-    appContext.setApplicationName(args.appName)
-    appContext.setQueue(args.amQueue)
+    appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
+    appContext.setQueue(sparkConf.get(QUEUE_NAME))
     appContext.setAMContainerSpec(containerContext)
     appContext.setApplicationType("SPARK")
 
@@ -217,8 +240,8 @@ private[spark] class Client(
     }
 
     val capability = Records.newRecord(classOf[Resource])
-    capability.setMemory(args.amMemory + amMemoryOverhead)
-    capability.setVirtualCores(args.amCores)
+    capability.setMemory(amMemory + amMemoryOverhead)
+    capability.setVirtualCores(amCores)
 
     sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
       case Some(expr) =>
@@ -272,16 +295,16 @@ private[spark] class Client(
     val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
     logInfo("Verifying our application has not requested more than the maximum " +
       s"memory capability of the cluster ($maxMem MB per container)")
-    val executorMem = args.executorMemory + executorMemoryOverhead
+    val executorMem = executorMemory + executorMemoryOverhead
     if (executorMem > maxMem) {
-      throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+      throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
         s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
         "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
         "'yarn.nodemanager.resource.memory-mb'.")
     }
-    val amMem = args.amMemory + amMemoryOverhead
+    val amMem = amMemory + amMemoryOverhead
     if (amMem > maxMem) {
-      throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+      throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
         s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
         "Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.")
     }
@@ -493,17 +516,15 @@ private[spark] class Client(
      */
     val cachedSecondaryJarLinks = ListBuffer.empty[String]
     List(
-      (args.addJars, LocalResourceType.FILE, true),
-      (args.files, LocalResourceType.FILE, false),
-      (args.archives, LocalResourceType.ARCHIVE, false)
+      (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
+      (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
+      (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
     ).foreach { case (flist, resType, addToClasspath) =>
-      if (flist != null && !flist.isEmpty()) {
-        flist.split(',').foreach { file =>
-          val (_, localizedPath) = distribute(file, resType = resType)
-          require(localizedPath != null)
-          if (addToClasspath) {
-            cachedSecondaryJarLinks += localizedPath
-          }
+      flist.foreach { file =>
+        val (_, localizedPath) = distribute(file, resType = resType)
+        require(localizedPath != null)
+        if (addToClasspath) {
+          cachedSecondaryJarLinks += localizedPath
         }
       }
     }
@@ -519,7 +540,7 @@ private[spark] class Client(
 
     // The python files list needs to be treated especially. All files that are not an
     // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
-    args.pyFiles.foreach { f =>
+    sparkConf.get(PY_FILES).foreach { f =>
       val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
       distribute(f, targetDir = targetDir)
     }
@@ -678,7 +699,7 @@ private[spark] class Client(
     //
     // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
     val pythonPath = new ListBuffer[String]()
-    val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+    val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
     if (pyFiles.nonEmpty) {
       pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
         LOCALIZED_PYTHON_DIR)
@@ -775,7 +796,7 @@ private[spark] class Client(
     var prefixEnv: Option[String] = None
 
     // Add Xmx for AM memory
-    javaOpts += "-Xmx" + args.amMemory + "m"
+    javaOpts += "-Xmx" + amMemory + "m"
 
     val tmpDir = new Path(
       YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
@@ -879,8 +900,6 @@ private[spark] class Client(
     val amArgs =
       Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
         userArgs ++ Seq(
-          "--executor-memory", args.executorMemory.toString + "m",
-          "--executor-cores", args.executorCores.toString,
           "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
             LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
 
@@ -919,10 +938,10 @@ private[spark] class Client(
   }
 
   def setupCredentials(): Unit = {
-    loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key)
+    loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
     if (loginFromKeytab) {
-      principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get
-      keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull
+      principal = sparkConf.get(PRINCIPAL).get
+      keytab = sparkConf.get(KEYTAB).orNull
 
       require(keytab != null, "Keytab must be specified when principal is specified.")
       logInfo("Attempting to login to the Kerberos" +
@@ -1084,7 +1103,7 @@ private[spark] class Client(
 
 }
 
-object Client extends Logging {
+private object Client extends Logging {
 
   def main(argStrings: Array[String]) {
     if (!sys.props.contains("SPARK_SUBMIT")) {
@@ -1097,11 +1116,7 @@ object Client extends Logging {
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
 
-    val args = new ClientArguments(argStrings, sparkConf)
-    // to maintain backwards-compatibility
-    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
-      sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors)
-    }
+    val args = new ClientArguments(argStrings)
     new Client(args, sparkConf).run()
   }
 
@@ -1246,7 +1261,7 @@ object Client extends Logging {
 
       val secondaryJars =
         if (args != null) {
-          getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq))
+          getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE)))
         } else {
           getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
         }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 47b4cc300907b1140a8afaa2b81ce2267d46fd1b..61c027ec4483ab6646905642ab9f86500fde1fa7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -19,118 +19,20 @@ package org.apache.spark.deploy.yarn
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.config._
-import org.apache.spark.util.{IntParam, MemoryParam, Utils}
-
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(
-    args: Array[String],
-    sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String]) {
 
-  var addJars: String = null
-  var files: String = null
-  var archives: String = null
   var userJar: String = null
   var userClass: String = null
-  var pyFiles: Seq[String] = Nil
   var primaryPyFile: String = null
   var primaryRFile: String = null
   var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var executorMemory = 1024 // MB
-  var executorCores = 1
-  var numExecutors = DEFAULT_NUMBER_EXECUTORS
-  var amQueue = sparkConf.get(QUEUE_NAME)
-  var amMemory: Int = _
-  var amCores: Int = _
-  var appName: String = "Spark"
-  var priority = 0
-  var principal: String = null
-  var keytab: String = null
-  def isClusterMode: Boolean = userClass != null
-
-  private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB
-  private var driverCores: Int = 1
-  private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
 
   parseArgs(args.toList)
-  loadEnvironmentArgs()
-  validateArgs()
-
-  // Additional memory to allocate to containers
-  val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
-  val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
-  val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
-
-  /** Load any default arguments provided through environment variables and Spark properties. */
-  private def loadEnvironmentArgs(): Unit = {
-    // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
-    // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
-    files = Option(files)
-      .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
-      .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
-      .orNull
-    archives = Option(archives)
-      .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
-      .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
-      .orNull
-    // If dynamic allocation is enabled, start at the configured initial number of executors.
-    // Default to minExecutors if no initialExecutors is set.
-    numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
-    principal = Option(principal)
-      .orElse(sparkConf.get(PRINCIPAL))
-      .orNull
-    keytab = Option(keytab)
-      .orElse(sparkConf.get(KEYTAB))
-      .orNull
-  }
-
-  /**
-   * Fail fast if any arguments provided are invalid.
-   * This is intended to be called only after the provided arguments have been parsed.
-   */
-  private def validateArgs(): Unit = {
-    if (numExecutors < 0 || (!isDynamicAllocationEnabled && numExecutors == 0)) {
-      throw new IllegalArgumentException(
-        s"""
-           |Number of executors was $numExecutors, but must be at least 1
-           |(or 0 if dynamic executor allocation is enabled).
-           |${getUsageMessage()}
-         """.stripMargin)
-    }
-    if (executorCores < sparkConf.get(CPUS_PER_TASK)) {
-      throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.")
-    }
-    // scalastyle:off println
-    if (isClusterMode) {
-      for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) {
-        if (sparkConf.contains(key)) {
-          println(s"$key is set but does not apply in cluster mode.")
-        }
-      }
-      amMemory = driverMemory
-      amCores = driverCores
-    } else {
-      for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) {
-        if (sparkConf.contains(key)) {
-          println(s"$key is set but does not apply in client mode.")
-        }
-      }
-      amMemory = sparkConf.get(AM_MEMORY).toInt
-      amCores = sparkConf.get(AM_CORES)
-    }
-    // scalastyle:on println
-  }
 
   private def parseArgs(inputArgs: List[String]): Unit = {
     var args = inputArgs
 
-    // scalastyle:off println
     while (!args.isEmpty) {
       args match {
         case ("--jar") :: value :: tail =>
@@ -149,88 +51,16 @@ private[spark] class ClientArguments(
           primaryRFile = value
           args = tail
 
-        case ("--args" | "--arg") :: value :: tail =>
-          if (args(0) == "--args") {
-            println("--args is deprecated. Use --arg instead.")
-          }
+        case ("--arg") :: value :: tail =>
           userArgs += value
           args = tail
 
-        case ("--master-class" | "--am-class") :: value :: tail =>
-          println(s"${args(0)} is deprecated and is not used anymore.")
-          args = tail
-
-        case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
-          if (args(0) == "--master-memory") {
-            println("--master-memory is deprecated. Use --driver-memory instead.")
-          }
-          driverMemory = value
-          args = tail
-
-        case ("--driver-cores") :: IntParam(value) :: tail =>
-          driverCores = value
-          args = tail
-
-        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
-          if (args(0) == "--num-workers") {
-            println("--num-workers is deprecated. Use --num-executors instead.")
-          }
-          numExecutors = value
-          args = tail
-
-        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
-          if (args(0) == "--worker-memory") {
-            println("--worker-memory is deprecated. Use --executor-memory instead.")
-          }
-          executorMemory = value
-          args = tail
-
-        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
-          if (args(0) == "--worker-cores") {
-            println("--worker-cores is deprecated. Use --executor-cores instead.")
-          }
-          executorCores = value
-          args = tail
-
-        case ("--queue") :: value :: tail =>
-          amQueue = value
-          args = tail
-
-        case ("--name") :: value :: tail =>
-          appName = value
-          args = tail
-
-        case ("--addJars") :: value :: tail =>
-          addJars = value
-          args = tail
-
-        case ("--py-files") :: value :: tail =>
-          pyFiles = value.split(",")
-          args = tail
-
-        case ("--files") :: value :: tail =>
-          files = value
-          args = tail
-
-        case ("--archives") :: value :: tail =>
-          archives = value
-          args = tail
-
-        case ("--principal") :: value :: tail =>
-          principal = value
-          args = tail
-
-        case ("--keytab") :: value :: tail =>
-          keytab = value
-          args = tail
-
         case Nil =>
 
         case _ =>
           throw new IllegalArgumentException(getUsageMessage(args))
       }
     }
-    // scalastyle:on println
 
     if (primaryPyFile != null && primaryRFile != null) {
       throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
@@ -240,7 +70,6 @@ private[spark] class ClientArguments(
 
   private def getUsageMessage(unknownParam: List[String] = null): String = {
     val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
-    val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
     message +
       s"""
       |Usage: org.apache.spark.deploy.yarn.Client [options]
@@ -252,20 +81,6 @@ private[spark] class ClientArguments(
       |  --primary-r-file         A main R file
       |  --arg ARG                Argument to be passed to your application's main class.
       |                           Multiple invocations are possible, each will be passed in order.
-      |  --num-executors NUM      Number of executors to start (Default: 2)
-      |  --executor-cores NUM     Number of cores per executor (Default: 1).
-      |  --driver-memory MEM      Memory for driver (e.g. 1000M, 2G) (Default: $mem_mb Mb)
-      |  --driver-cores NUM       Number of cores used by the driver (Default: 1).
-      |  --executor-memory MEM    Memory per executor (e.g. 1000M, 2G) (Default: 1G)
-      |  --name NAME              The name of your application (Default: Spark)
-      |  --queue QUEUE            The hadoop queue to use for allocation requests (Default:
-      |                           'default')
-      |  --addJars jars           Comma separated list of local jars that want SparkContext.addJar
-      |                           to work with.
-      |  --py-files PY_FILES      Comma-separated list of .zip, .egg, or .py files to
-      |                           place on the PYTHONPATH for Python apps.
-      |  --files files            Comma separated list of files to be distributed with the job.
-      |  --archives archives      Comma separated list of archives to be distributed with the job.
       """.stripMargin
   }
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d09430236285eff60d45dd14b582bea590488ff3..7d71a642f62fc39d690cde6ffe4936b2628a6236 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -61,7 +62,6 @@ private[yarn] class YarnAllocator(
     sparkConf: SparkConf,
     amClient: AMRMClient[ContainerRequest],
     appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
     securityMgr: SecurityManager)
   extends Logging {
 
@@ -107,12 +107,12 @@ private[yarn] class YarnAllocator(
   private val containerIdToExecutorId = new HashMap[ContainerId, String]
 
   // Executor memory in MB.
-  protected val executorMemory = args.executorMemory
+  protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
   // Additional memory overhead.
   protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
   // Number of cores per executor.
-  protected val executorCores = args.executorCores
+  protected val executorCores = sparkConf.get(EXECUTOR_CORES)
   // Resource capability requested for each executors
   private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 83d30b7352a076a39ec8a5e94de3171ee336602e..e7f75446641cbafa70cacf6dbbaddbff9eeb38cf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
 /**
  * Handles registering and unregistering the application with the YARN ResourceManager.
  */
-private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging {
+private[spark] class YarnRMClient extends Logging {
 
   private var amClient: AMRMClient[ContainerRequest] = _
   private var uiHistoryAddress: String = _
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
       amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
       registered = true
     }
-    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
-      securityMgr)
+    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr)
   }
 
   /**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2915e664beffe880a72959463570e8c062b046c9..5af2c2980879dc4c8d1bf016eb4c4186fadaae97 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -512,7 +512,7 @@ object YarnSparkHadoopUtil {
       val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
       val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
       require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
-        s"initial executor number $initialNumExecutors must between min executor number" +
+        s"initial executor number $initialNumExecutors must between min executor number " +
           s"$minNumExecutors and max executor number $maxNumExecutors")
 
       initialNumExecutors
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 0789567ae6a184ddc4623a76748155212da18919..a3b9134b582835c612c01591c64c89ac21233ac8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -85,11 +85,18 @@ package object config {
 
   private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
     .stringConf
-    .optional
+    .toSequence
+    .withDefault(Nil)
 
   private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
     .stringConf
-    .optional
+    .toSequence
+    .withDefault(Nil)
+
+  private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
+    .stringConf
+    .toSequence
+    .withDefault(Nil)
 
   private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
     .doc("Whether to preserve temporary files created by the job in HDFS.")
@@ -183,7 +190,7 @@ package object config {
 
   private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
     .intConf
-    .optional
+    .withDefault(1)
 
   private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
     .bytesConf(ByteUnit.MiB)
@@ -191,6 +198,10 @@ package object config {
 
   /* Executor configuration. */
 
+  private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
+    .intConf
+    .withDefault(1)
+
   private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
     .bytesConf(ByteUnit.MiB)
     .optional
@@ -245,5 +256,4 @@ package object config {
     .stringConf
     .toSequence
     .optional
-
 }
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9fc727904b1ea9a393be71a42ac04d3fba907cb5..56dc0004d04cca29181cfd870967ddb6fda36055 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,11 +48,10 @@ private[spark] class YarnClientSchedulerBackend(
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += ("--arg", hostport)
-    argsArrayBuf ++= getExtraClientArguments
 
     logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
-    val args = new ClientArguments(argsArrayBuf.toArray, conf)
-    totalExpectedExecutors = args.numExecutors
+    val args = new ClientArguments(argsArrayBuf.toArray)
+    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
     client = new Client(args, conf)
     bindToYarn(client.submitApplication(), None)
 
@@ -72,43 +71,6 @@ private[spark] class YarnClientSchedulerBackend(
     monitorThread.start()
   }
 
-  /**
-   * Return any extra command line arguments to be passed to Client provided in the form of
-   * environment variables or Spark properties.
-   */
-  private def getExtraClientArguments: Seq[String] = {
-    val extraArgs = new ArrayBuffer[String]
-    // List of (target Client argument, environment variable, Spark property)
-    val optionTuples =
-      List(
-        ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
-        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
-        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
-        ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
-        ("--py-files", null, "spark.submit.pyFiles")
-      )
-    // Warn against the following deprecated environment variables: env var -> suggestion
-    val deprecatedEnvVars = Map(
-      "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
-      "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
-    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
-      if (sc.getConf.contains(sparkProp)) {
-        extraArgs += (optionName, sc.getConf.get(sparkProp))
-      } else if (envVar != null && System.getenv(envVar) != null) {
-        extraArgs += (optionName, System.getenv(envVar))
-        if (deprecatedEnvVars.contains(envVar)) {
-          logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
-        }
-      }
-    }
-    // The app name is a special case because "spark.app.name" is required of all applications.
-    // As a result, the corresponding "SPARK_YARN_APP_NAME" is already handled preemptively in
-    // SparkSubmitArguments if "spark.app.name" is not explicitly set by the user. (SPARK-5222)
-    sc.getConf.getOption("spark.app.name").foreach(v => extraArgs += ("--name", v))
-    extraArgs
-  }
-
   /**
    * Report the state of the application until it is running.
    * If the application has finished, failed or been killed in the process, throw an exception.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 64723c361c0aba8567dd47777c61c511d05548d0..2eaafa072a3aeb65c6b28a40aa81b9b230440020 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val sparkConf = new SparkConf()
       .set(SPARK_JARS, Seq(SPARK))
       .set(USER_CLASS_PATH_FIRST, true)
+      .set("spark.yarn.dist.jars", ADDED)
     val env = new MutableHashMap[String, String]()
-    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+    val args = new ClientArguments(Array("--jar", USER))
 
     populateClasspath(args, conf, sparkConf, env)
 
@@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   }
 
   test("Jar path propagation through SparkConf") {
-    val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
-    val client = createClient(sparkConf,
-      args = Array("--jar", USER, "--addJars", ADDED))
+    val conf = new Configuration()
+    val sparkConf = new SparkConf()
+      .set(SPARK_JARS, Seq(SPARK))
+      .set("spark.yarn.dist.jars", ADDED)
+    val client = createClient(sparkConf, args = Array("--jar", USER))
 
     val tempDir = Utils.createTempDir()
     try {
@@ -192,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val sparkConf = new SparkConf()
       .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
       .set(MAX_APP_ATTEMPTS, 42)
-    val args = new ClientArguments(Array(
-      "--name", "foo-test-app",
-      "--queue", "staging-queue"), sparkConf)
+      .set("spark.app.name", "foo-test-app")
+      .set(QUEUE_NAME, "staging-queue")
+    val args = new ClientArguments(Array())
 
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
     val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
       sparkConf: SparkConf,
       conf: Configuration = new Configuration(),
       args: Array[String] = Array()): Client = {
-    val clientArgs = new ClientArguments(args, sparkConf)
+    val clientArgs = new ClientArguments(args)
     val client = spy(new Client(clientArgs, conf, sparkConf))
     doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
       any(classOf[Path]), anyShort())
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0587444a334b49291d595b9f8ab51d685f68a5c3..a641a6e73e8530ca0a17dd3e0aa6206c27551f03 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
     val args = Array(
-      "--executor-cores", "5",
-      "--executor-memory", "2048",
       "--jar", "somejar.jar",
       "--class", "SomeClass")
     val sparkConfClone = sparkConf.clone()
-    sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
+    sparkConfClone
+      .set("spark.executor.instances", maxExecutors.toString)
+      .set("spark.executor.cores", "5")
+      .set("spark.executor.memory", "2048")
     new YarnAllocator(
       "not used",
       mock(classOf[RpcEndpointRef]),
@@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConfClone,
       rmClient,
       appAttemptId,
-      new ApplicationMasterArguments(args),
       new SecurityManager(sparkConf))
   }
 
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26520529ecabc640106f7f65db2b9ac240dba356..b2b4d84f53d8500643ff3be5dfc0496cef59ea84 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     testBasicYarnApp(false)
   }
 
+  test("run Spark in yarn-client mode with different configurations") {
+    testBasicYarnApp(true,
+      Map(
+        "spark.driver.memory" -> "512m",
+        "spark.executor.cores" -> "1",
+        "spark.executor.memory" -> "512m",
+        "spark.executor.instances" -> "2"
+      ))
+  }
+
+  test("run Spark in yarn-cluster mode with different configurations") {
+    testBasicYarnApp(true,
+      Map(
+        "spark.driver.memory" -> "512m",
+        "spark.driver.cores" -> "1",
+        "spark.executor.cores" -> "1",
+        "spark.executor.memory" -> "512m",
+        "spark.executor.instances" -> "2"
+      ))
+  }
+
+  test("run Spark in yarn-client mode with additional jar") {
+    testWithAddJar(true)
+  }
+
+  test("run Spark in yarn-cluster mode with additional jar") {
+    testWithAddJar(false)
+  }
+
   test("run Spark in yarn-cluster mode unsuccessfully") {
     // Don't provide arguments so the driver will fail.
     val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
@@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     }
   }
 
-  private def testBasicYarnApp(clientMode: Boolean): Unit = {
+  private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
     val result = File.createTempFile("result", null, tempDir)
     val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
-      appArgs = Seq(result.getAbsolutePath()))
+      appArgs = Seq(result.getAbsolutePath()),
+      extraConf = conf)
     checkResult(finalState, result)
   }
 
+  private def testWithAddJar(clientMode: Boolean): Unit = {
+    val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+    val driverResult = File.createTempFile("driver", null, tempDir)
+    val executorResult = File.createTempFile("executor", null, tempDir)
+    val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+      appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+      extraClassPath = Seq(originalJar.getPath()),
+      extraJars = Seq("local:" + originalJar.getPath()))
+    checkResult(finalState, driverResult, "ORIGINAL")
+    checkResult(finalState, executorResult, "ORIGINAL")
+  }
+
   private def testPySpark(clientMode: Boolean): Unit = {
     val primaryPyFile = new File(tempDir, "test.py")
     Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)