diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index b3d8e0cd7cdcdbfbf27a636f7897e4dc36a16c0f..ccc527306d9200c1625d02a666b9b4f6a518a88d 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -159,10 +159,10 @@ public class JavaUtils {
       .build();
 
   /**
-   * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
-   * internal use. If no suffix is provided a direct conversion is attempted.
+   * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
+   * The unit is also considered the default if the given string does not specify a unit.
    */
-  private static long parseTimeString(String str, TimeUnit unit) {
+  public static long timeStringAs(String str, TimeUnit unit) {
     String lower = str.toLowerCase().trim();
 
     try {
@@ -195,7 +195,7 @@ public class JavaUtils {
    * no suffix is provided, the passed number is assumed to be in ms.
    */
   public static long timeStringAsMs(String str) {
-    return parseTimeString(str, TimeUnit.MILLISECONDS);
+    return timeStringAs(str, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -203,15 +203,14 @@ public class JavaUtils {
    * no suffix is provided, the passed number is assumed to be in seconds.
    */
   public static long timeStringAsSec(String str) {
-    return parseTimeString(str, TimeUnit.SECONDS);
+    return timeStringAs(str, TimeUnit.SECONDS);
   }
 
   /**
-   * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
-   * internal use. If no suffix is provided a direct conversion of the provided default is
-   * attempted.
+   * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to the given. If no suffix is
+   * provided, a direct conversion to the provided unit is attempted.
    */
-  private static long parseByteString(String str, ByteUnit unit) {
+  public static long byteStringAs(String str, ByteUnit unit) {
     String lower = str.toLowerCase().trim();
 
     try {
@@ -252,7 +251,7 @@ public class JavaUtils {
    * If no suffix is provided, the passed number is assumed to be in bytes.
    */
   public static long byteStringAsBytes(String str) {
-    return parseByteString(str, ByteUnit.BYTE);
+    return byteStringAs(str, ByteUnit.BYTE);
   }
 
   /**
@@ -262,7 +261,7 @@ public class JavaUtils {
    * If no suffix is provided, the passed number is assumed to be in kibibytes.
    */
   public static long byteStringAsKb(String str) {
-    return parseByteString(str, ByteUnit.KiB);
+    return byteStringAs(str, ByteUnit.KiB);
   }
 
   /**
@@ -272,7 +271,7 @@ public class JavaUtils {
    * If no suffix is provided, the passed number is assumed to be in mebibytes.
    */
   public static long byteStringAsMb(String str) {
-    return parseByteString(str, ByteUnit.MiB);
+    return byteStringAs(str, ByteUnit.MiB);
   }
 
   /**
@@ -282,7 +281,7 @@ public class JavaUtils {
    * If no suffix is provided, the passed number is assumed to be in gibibytes.
    */
   public static long byteStringAsGb(String str) {
-    return parseByteString(str, ByteUnit.GiB);
+    return byteStringAs(str, ByteUnit.GiB);
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b81bfb31822125945d1551475eebe18d35291652..16423e771a3de5ce782abc85fdb4ecb87e669aa8 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark
 
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.LinkedHashSet
 
 import org.apache.avro.{Schema, SchemaNormalization}
 
+import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -74,6 +76,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     this
   }
 
+  private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
+    set(entry.key, entry.stringConverter(value))
+    this
+  }
+
+  private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+    set(entry.key, entry.rawStringConverter(value))
+    this
+  }
+
   /**
    * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
    * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
@@ -148,6 +160,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     this
   }
 
+  private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = {
+    if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) {
+      logDeprecationWarning(entry.key)
+    }
+    this
+  }
+
+  private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+    if (settings.putIfAbsent(entry.key, entry.rawStringConverter(value)) == null) {
+      logDeprecationWarning(entry.key)
+    }
+    this
+  }
+
   /**
    * Use Kryo serialization and register the given set of classes with Kryo.
    * If called multiple times, this will append the classes from all calls together.
@@ -198,6 +224,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     getOption(key).getOrElse(defaultValue)
   }
 
+  /**
+   * Retrieves the value of a pre-defined configuration entry.
+   *
+   * - This is an internal Spark API.
+   * - The return type if defined by the configuration entry.
+   * - This will throw an exception is the config is not optional and the value is not set.
+   */
+  private[spark] def get[T](entry: ConfigEntry[T]): T = {
+    entry.readFrom(this)
+  }
+
   /**
    * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
    * suffix is provided then seconds are assumed.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
new file mode 100644
index 0000000000000000000000000000000000000000..770b43697a176d653c4f7c3a03322d431c9cb578
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+
+private object ConfigHelpers {
+
+  def toNumber[T](s: String, converter: String => T, key: String, configType: String): T = {
+    try {
+      converter(s)
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalArgumentException(s"$key should be $configType, but was $s")
+    }
+  }
+
+  def toBoolean(s: String, key: String): Boolean = {
+    try {
+      s.toBoolean
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new IllegalArgumentException(s"$key should be boolean, but was $s")
+    }
+  }
+
+  def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
+    str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
+  }
+
+  def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
+    v.map(stringConverter).mkString(",")
+  }
+
+  def timeFromString(str: String, unit: TimeUnit): Long = JavaUtils.timeStringAs(str, unit)
+
+  def timeToString(v: Long, unit: TimeUnit): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms"
+
+  def byteFromString(str: String, unit: ByteUnit): Long = {
+    val (input, multiplier) =
+      if (str.length() > 0 && str.charAt(0) == '-') {
+        (str.substring(1), -1)
+      } else {
+        (str, 1)
+      }
+    multiplier * JavaUtils.byteStringAs(input, unit)
+  }
+
+  def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
+
+}
+
+/**
+ * A type-safe config builder. Provides methods for transforming the input data (which can be
+ * used, e.g., for validation) and creating the final config entry.
+ *
+ * One of the methods that return a [[ConfigEntry]] must be called to create a config entry that
+ * can be used with [[SparkConf]].
+ */
+private[spark] class TypedConfigBuilder[T](
+  val parent: ConfigBuilder,
+  val converter: String => T,
+  val stringConverter: T => String) {
+
+  import ConfigHelpers._
+
+  def this(parent: ConfigBuilder, converter: String => T) = {
+    this(parent, converter, Option(_).map(_.toString).orNull)
+  }
+
+  def transform(fn: T => T): TypedConfigBuilder[T] = {
+    new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
+  }
+
+  def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
+    transform { v =>
+      if (!validValues.contains(v)) {
+        throw new IllegalArgumentException(
+          s"The value of ${parent.key} should be one of ${validValues.mkString(", ")}, but was $v")
+      }
+      v
+    }
+  }
+
+  def toSequence: TypedConfigBuilder[Seq[T]] = {
+    new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
+  }
+
+  /** Creates a [[ConfigEntry]] that does not require a default value. */
+  def optional: OptionalConfigEntry[T] = {
+    new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+  }
+
+  /** Creates a [[ConfigEntry]] that has a default value. */
+  def withDefault(default: T): ConfigEntry[T] = {
+    val transformedDefault = converter(stringConverter(default))
+    new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
+      parent._doc, parent._public)
+  }
+
+  /**
+   * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
+   * [[String]] and must be a valid value for the entry.
+   */
+  def withDefaultString(default: String): ConfigEntry[T] = {
+    val typedDefault = converter(default)
+    new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
+      parent._public)
+  }
+
+}
+
+/**
+ * Basic builder for Spark configurations. Provides methods for creating type-specific builders.
+ *
+ * @see TypedConfigBuilder
+ */
+private[spark] case class ConfigBuilder(key: String) {
+
+  import ConfigHelpers._
+
+  var _public = true
+  var _doc = ""
+
+  def internal: ConfigBuilder = {
+    _public = false
+    this
+  }
+
+  def doc(s: String): ConfigBuilder = {
+    _doc = s
+    this
+  }
+
+  def intConf: TypedConfigBuilder[Int] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
+  }
+
+  def longConf: TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
+  }
+
+  def doubleConf: TypedConfigBuilder[Double] = {
+    new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
+  }
+
+  def booleanConf: TypedConfigBuilder[Boolean] = {
+    new TypedConfigBuilder(this, toBoolean(_, key))
+  }
+
+  def stringConf: TypedConfigBuilder[String] = {
+    new TypedConfigBuilder(this, v => v)
+  }
+
+  def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
+  }
+
+  def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
+    new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
+  }
+
+  def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
+    new FallbackConfigEntry(key, _doc, _public, fallback)
+  }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f7296b487c0e9f4011401bbbc34720dc6d2aa227
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import org.apache.spark.SparkConf
+
+/**
+ * An entry contains all meta information for a configuration.
+ *
+ * @param key the key for the configuration
+ * @param defaultValue the default value for the configuration
+ * @param valueConverter how to convert a string to the value. It should throw an exception if the
+ *                       string does not have the required format.
+ * @param stringConverter how to convert a value to a string that the user can use it as a valid
+ *                        string value. It's usually `toString`. But sometimes, a custom converter
+ *                        is necessary. E.g., if T is List[String], `a, b, c` is better than
+ *                        `List(a, b, c)`.
+ * @param doc the documentation for the configuration
+ * @param isPublic if this configuration is public to the user. If it's `false`, this
+ *                 configuration is only used internally and we should not expose it to users.
+ * @tparam T the value type
+ */
+private[spark] abstract class ConfigEntry[T] (
+    val key: String,
+    val valueConverter: String => T,
+    val stringConverter: T => String,
+    val doc: String,
+    val isPublic: Boolean) {
+
+  def defaultValueString: String
+
+  def readFrom(conf: SparkConf): T
+
+  // This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
+  // use readFrom().
+  def defaultValue: Option[T] = None
+
+  override def toString: String = {
+    s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
+  }
+}
+
+private class ConfigEntryWithDefault[T] (
+    key: String,
+    _defaultValue: T,
+    valueConverter: String => T,
+    stringConverter: T => String,
+    doc: String,
+    isPublic: Boolean)
+    extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+
+  override def defaultValue: Option[T] = Some(_defaultValue)
+
+  override def defaultValueString: String = stringConverter(_defaultValue)
+
+  override def readFrom(conf: SparkConf): T = {
+    conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
+  }
+
+}
+
+/**
+ * A config entry that does not have a default value.
+ */
+private[spark] class OptionalConfigEntry[T](
+    key: String,
+    val rawValueConverter: String => T,
+    val rawStringConverter: T => String,
+    doc: String,
+    isPublic: Boolean)
+    extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
+      v => v.map(rawStringConverter).orNull, doc, isPublic) {
+
+  override def defaultValueString: String = "<undefined>"
+
+  override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
+
+}
+
+/**
+ * A config entry whose default value is defined by another config entry.
+ */
+private class FallbackConfigEntry[T] (
+    key: String,
+    doc: String,
+    isPublic: Boolean,
+    private val fallback: ConfigEntry[T])
+    extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
+
+  override def defaultValueString: String = s"<value of ${fallback.key}>"
+
+  override def readFrom(conf: SparkConf): T = {
+    conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f2f20b32075774fee4b9b21be0f1ea4aa0816fce
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal
+
+import org.apache.spark.launcher.SparkLauncher
+
+package object config {
+
+  private[spark] val DRIVER_CLASS_PATH =
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+
+  private[spark] val DRIVER_JAVA_OPTIONS =
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+  private[spark] val DRIVER_LIBRARY_PATH =
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+
+  private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
+    ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+
+  private[spark] val EXECUTOR_CLASS_PATH =
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+
+  private[spark] val EXECUTOR_JAVA_OPTIONS =
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+  private[spark] val EXECUTOR_LIBRARY_PATH =
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+
+  private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
+    ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+
+  private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
+    .booleanConf.withDefault(false)
+
+  private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+
+  private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
+    ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+
+  private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
+    ConfigBuilder("spark.dynamicAllocation.initialExecutors")
+      .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
+
+  private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
+    ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+
+  private[spark] val SHUFFLE_SERVICE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+
+  private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
+    .doc("Location of user's keytab.")
+    .stringConf.optional
+
+  private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
+    .doc("Name of the Kerberos principal.")
+    .stringConf.optional
+
+  private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+
+}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0644148eaea5699bd6e96d1ebfd2e13ddc866373
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.ByteUnit
+
+class ConfigEntrySuite extends SparkFunSuite {
+
+  test("conf entry: int") {
+    val conf = new SparkConf()
+    val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+    assert(conf.get(iConf) === 1)
+    conf.set(iConf, 2)
+    assert(conf.get(iConf) === 2)
+  }
+
+  test("conf entry: long") {
+    val conf = new SparkConf()
+    val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+    conf.set(lConf, 1234L)
+    assert(conf.get(lConf) === 1234L)
+  }
+
+  test("conf entry: double") {
+    val conf = new SparkConf()
+    val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+    conf.set(dConf, 20.0)
+    assert(conf.get(dConf) === 20.0)
+  }
+
+  test("conf entry: boolean") {
+    val conf = new SparkConf()
+    val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+    assert(!conf.get(bConf))
+    conf.set(bConf, true)
+    assert(conf.get(bConf))
+  }
+
+  test("conf entry: optional") {
+    val conf = new SparkConf()
+    val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+    assert(conf.get(optionalConf) === None)
+    conf.set(optionalConf, 1)
+    assert(conf.get(optionalConf) === Some(1))
+  }
+
+  test("conf entry: fallback") {
+    val conf = new SparkConf()
+    val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+    val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
+    assert(conf.get(confWithFallback) === 1)
+    conf.set(confWithFallback, 2)
+    assert(conf.get(parentConf) === 1)
+    assert(conf.get(confWithFallback) === 2)
+  }
+
+  test("conf entry: time") {
+    val conf = new SparkConf()
+    val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+    assert(conf.get(time) === 3600L)
+    conf.set(time.key, "1m")
+    assert(conf.get(time) === 60L)
+  }
+
+  test("conf entry: bytes") {
+    val conf = new SparkConf()
+    val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+    assert(conf.get(bytes) === 1024L)
+    conf.set(bytes.key, "1k")
+    assert(conf.get(bytes) === 1L)
+  }
+
+  test("conf entry: string seq") {
+    val conf = new SparkConf()
+    val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+    conf.set(seq.key, "1,,2, 3 , , 4")
+    assert(conf.get(seq) === Seq("1", "2", "3", "4"))
+    conf.set(seq, Seq("1", "2"))
+    assert(conf.get(seq) === Seq("1", "2"))
+  }
+
+  test("conf entry: int seq") {
+    val conf = new SparkConf()
+    val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+    conf.set(seq.key, "1,,2, 3 , , 4")
+    assert(conf.get(seq) === Seq(1, 2, 3, 4))
+    conf.set(seq, Seq(1, 2))
+    assert(conf.get(seq) === Seq(1, 2))
+  }
+
+  test("conf entry: transformation") {
+    val conf = new SparkConf()
+    val transformationConf = ConfigBuilder("spark.transformation")
+      .stringConf
+      .transform(_.toLowerCase())
+      .withDefault("FOO")
+
+    assert(conf.get(transformationConf) === "foo")
+    conf.set(transformationConf, "BAR")
+    assert(conf.get(transformationConf) === "bar")
+  }
+
+  test("conf entry: valid values check") {
+    val conf = new SparkConf()
+    val enum = ConfigBuilder("spark.enum")
+      .stringConf
+      .checkValues(Set("a", "b", "c"))
+      .withDefault("a")
+    assert(conf.get(enum) === "a")
+
+    conf.set(enum, "b")
+    assert(conf.get(enum) === "b")
+
+    conf.set(enum, "d")
+    val enumError = intercept[IllegalArgumentException] {
+      conf.get(enum)
+    }
+    assert(enumError.getMessage === s"The value of ${enum.key} should be one of a, b, c, but was d")
+  }
+
+  test("conf entry: conversion error") {
+    val conf = new SparkConf()
+    val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+    conf.set(conversionTest.key, "abc")
+    val conversionError = intercept[IllegalArgumentException] {
+      conf.get(conversionTest)
+    }
+    assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc")
+  }
+
+  test("default value handling is null-safe") {
+    val conf = new SparkConf()
+    val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+    assert(conf.get(stringConf) === null)
+  }
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 70b67d21ecc7813bef4628cc26d1538adb47befd..6e95bb97105fd195181c76bd9468e21494f314f0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.util.ThreadUtils
 
 /*
@@ -60,11 +62,9 @@ private[yarn] class AMDelegationTokenRenewer(
 
   private val hadoopUtil = YarnSparkHadoopUtil.get
 
-  private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
-  private val daysToKeepFiles =
-    sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
-  private val numFilesToKeep =
-    sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
+  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
   private val freshHadoopConf =
     hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
 
@@ -76,8 +76,8 @@ private[yarn] class AMDelegationTokenRenewer(
    *
    */
   private[spark] def scheduleLoginFromKeytab(): Unit = {
-    val principal = sparkConf.get("spark.yarn.principal")
-    val keytab = sparkConf.get("spark.yarn.keytab")
+    val principal = sparkConf.get(PRINCIPAL).get
+    val keytab = sparkConf.get(KEYTAB).get
 
     /**
      * Schedule re-login and creation of new tokens. If tokens have already expired, this method
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9f586bf4c1979b1c64fd59c291cb6cbec1763dfc..7d7bf88b9eb1215126fcc869aa393c795c3d7f39 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -65,16 +67,15 @@ private[spark] class ApplicationMaster(
   // allocation is enabled), with a minimum of 3.
 
   private val maxNumExecutorFailures = {
-    val defaultKey =
+    val effectiveNumExecutors =
       if (Utils.isDynamicAllocationEnabled(sparkConf)) {
-        "spark.dynamicAllocation.maxExecutors"
+        sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
       } else {
-        "spark.executor.instances"
+        sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
       }
-    val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0)
     val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors)
 
-    sparkConf.getInt("spark.yarn.max.executor.failures", defaultMaxNumExecutorFailures)
+    sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
   }
 
   @volatile private var exitCode = 0
@@ -95,14 +96,13 @@ private[spark] class ApplicationMaster(
   private val heartbeatInterval = {
     // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
     val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    math.max(0, math.min(expiryInterval / 2,
-      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+    math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
   }
 
   // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
   // being requested.
   private val initialAllocationInterval = math.min(heartbeatInterval,
-    sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+    sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))
 
   // Next wait interval before allocator poll.
   private var nextAllocationInterval = initialAllocationInterval
@@ -178,7 +178,7 @@ private[spark] class ApplicationMaster(
 
       // If the credentials file config is present, we must periodically renew tokens. So create
       // a new AMDelegationTokenRenewer
-      if (sparkConf.contains("spark.yarn.credentials.file")) {
+      if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
         delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
         // If a principal and keytab have been set, use that to create new credentials for executors
         // periodically
@@ -275,7 +275,7 @@ private[spark] class ApplicationMaster(
     val appId = client.getAttemptId().getApplicationId().toString()
     val attemptId = client.getAttemptId().getAttemptId().toString()
     val historyAddress =
-      sparkConf.getOption("spark.yarn.historyServer.address")
+      sparkConf.get(HISTORY_SERVER_ADDRESS)
         .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
         .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
         .getOrElse("")
@@ -355,7 +355,7 @@ private[spark] class ApplicationMaster(
 
   private def launchReporterThread(): Thread = {
     // The number of failures in a row until Reporter thread give up
-    val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
 
     val t = new Thread {
       override def run() {
@@ -429,7 +429,7 @@ private[spark] class ApplicationMaster(
   private def cleanupStagingDir(fs: FileSystem) {
     var stagingDirPath: Path = null
     try {
-      val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       if (!preserveFiles) {
         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {
@@ -448,7 +448,7 @@ private[spark] class ApplicationMaster(
   private def waitForSparkContextInitialized(): SparkContext = {
     logInfo("Waiting for spark context initialization")
     sparkContextRef.synchronized {
-      val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
+      val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
       val deadline = System.currentTimeMillis() + totalWaitTime
 
       while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
@@ -473,7 +473,7 @@ private[spark] class ApplicationMaster(
 
     // Spark driver should already be up since it launched us, but we don't want to
     // wait forever, so wait 100 seconds max to match the cluster mode setting.
-    val totalWaitTimeMs = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
+    val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
     val deadline = System.currentTimeMillis + totalWaitTimeMs
 
     while (!driverUp && !finished && System.currentTimeMillis < deadline) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index be45e9597f3018add45f2dbf450fad55a7e6ce84..36073de90d6afd9ce0e28112c8825a261f4de88a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
 import org.apache.spark.util.Utils
 
@@ -87,8 +89,7 @@ private[spark] class Client(
       }
     }
   }
-  private val fireAndForget = isClusterMode &&
-    !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
+  private val fireAndForget = isClusterMode && sparkConf.get(WAIT_FOR_APP_COMPLETION)
 
   private var appId: ApplicationId = null
 
@@ -156,7 +157,7 @@ private[spark] class Client(
   private def cleanupStagingDir(appId: ApplicationId): Unit = {
     val appStagingDir = getAppStagingDir(appId)
     try {
-      val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       val stagingDirPath = new Path(appStagingDir)
       val fs = FileSystem.get(hadoopConf)
       if (!preserveFiles && fs.exists(stagingDirPath)) {
@@ -181,39 +182,36 @@ private[spark] class Client(
     appContext.setQueue(args.amQueue)
     appContext.setAMContainerSpec(containerContext)
     appContext.setApplicationType("SPARK")
-    sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
-      .map(StringUtils.getTrimmedStringCollection(_))
-      .filter(!_.isEmpty())
-      .foreach { tagCollection =>
-        try {
-          // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
-          // reflection to set it, printing a warning if a tag was specified but the YARN version
-          // doesn't support it.
-          val method = appContext.getClass().getMethod(
-            "setApplicationTags", classOf[java.util.Set[String]])
-          method.invoke(appContext, new java.util.HashSet[String](tagCollection))
-        } catch {
-          case e: NoSuchMethodException =>
-            logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
-              "YARN does not support it")
-        }
+
+    sparkConf.get(APPLICATION_TAGS).foreach { tags =>
+      try {
+        // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
+        // reflection to set it, printing a warning if a tag was specified but the YARN version
+        // doesn't support it.
+        val method = appContext.getClass().getMethod(
+          "setApplicationTags", classOf[java.util.Set[String]])
+        method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
+      } catch {
+        case e: NoSuchMethodException =>
+          logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
+            "YARN does not support it")
       }
-    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+    }
+    sparkConf.get(MAX_APP_ATTEMPTS) match {
       case Some(v) => appContext.setMaxAppAttempts(v)
-      case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+      case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
           "Cluster's default value will be used.")
     }
 
-    if (sparkConf.contains("spark.yarn.am.attemptFailuresValidityInterval")) {
+    sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
       try {
-        val interval = sparkConf.getTimeAsMs("spark.yarn.am.attemptFailuresValidityInterval")
         val method = appContext.getClass().getMethod(
           "setAttemptFailuresValidityInterval", classOf[Long])
         method.invoke(appContext, interval: java.lang.Long)
       } catch {
         case e: NoSuchMethodException =>
-          logWarning("Ignoring spark.yarn.am.attemptFailuresValidityInterval because the version " +
-            "of YARN does not support it")
+          logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+            "the version of YARN does not support it")
       }
     }
 
@@ -221,28 +219,28 @@ private[spark] class Client(
     capability.setMemory(args.amMemory + amMemoryOverhead)
     capability.setVirtualCores(args.amCores)
 
-    if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
-      try {
-        val amRequest = Records.newRecord(classOf[ResourceRequest])
-        amRequest.setResourceName(ResourceRequest.ANY)
-        amRequest.setPriority(Priority.newInstance(0))
-        amRequest.setCapability(capability)
-        amRequest.setNumContainers(1)
-        val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
-        val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
-        method.invoke(amRequest, amLabelExpression)
-
-        val setResourceRequestMethod =
-          appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
-        setResourceRequestMethod.invoke(appContext, amRequest)
-      } catch {
-        case e: NoSuchMethodException =>
-          logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
-            "of YARN does not support it")
-          appContext.setResource(capability)
-      }
-    } else {
-      appContext.setResource(capability)
+    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
+      case Some(expr) =>
+        try {
+          val amRequest = Records.newRecord(classOf[ResourceRequest])
+          amRequest.setResourceName(ResourceRequest.ANY)
+          amRequest.setPriority(Priority.newInstance(0))
+          amRequest.setCapability(capability)
+          amRequest.setNumContainers(1)
+          val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
+          method.invoke(amRequest, expr)
+
+          val setResourceRequestMethod =
+            appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
+          setResourceRequestMethod.invoke(appContext, amRequest)
+        } catch {
+          case e: NoSuchMethodException =>
+            logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
+              "of YARN does not support it")
+            appContext.setResource(capability)
+        }
+      case None =>
+        appContext.setResource(capability)
     }
 
     appContext
@@ -345,8 +343,8 @@ private[spark] class Client(
     YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
     YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
 
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
-      fs.getDefaultReplication(dst)).toShort
+    val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
+      .getOrElse(fs.getDefaultReplication(dst))
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
 
@@ -419,7 +417,7 @@ private[spark] class Client(
       logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
         " via the YARN Secure Distributed Cache.")
       val (_, localizedPath) = distribute(keytab,
-        destName = Some(sparkConf.get("spark.yarn.keytab")),
+        destName = sparkConf.get(KEYTAB),
         appMasterOnly = true)
       require(localizedPath != null, "Keytab file already distributed.")
     }
@@ -433,8 +431,8 @@ private[spark] class Client(
      *   (3) Spark property key to set if the scheme is not local
      */
     List(
-      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
-      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+      (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
+      (APP_JAR_NAME, args.userJar, APP_JAR.key),
       ("log4j.properties", oldLog4jConf.orNull, null)
     ).foreach { case (destName, path, confKey) =>
       if (path != null && !path.trim().isEmpty()) {
@@ -472,7 +470,7 @@ private[spark] class Client(
       }
     }
     if (cachedSecondaryJarLinks.nonEmpty) {
-      sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+      sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks)
     }
 
     if (isClusterMode && args.primaryPyFile != null) {
@@ -586,7 +584,7 @@ private[spark] class Client(
     val creds = new Credentials()
     val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
     YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
-      nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
+      nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
     val t = creds.getAllTokens.asScala
       .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
       .head
@@ -606,8 +604,7 @@ private[spark] class Client(
       pySparkArchives: Seq[String]): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
-    val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
-    populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
+    populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH))
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -615,11 +612,10 @@ private[spark] class Client(
       val remoteFs = FileSystem.get(hadoopConf)
       val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
       val credentialsFile = "credentials-" + UUID.randomUUID().toString
-      sparkConf.set(
-        "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
+      sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
       logInfo(s"Credentials file set to: $credentialsFile")
       val renewalInterval = getTokenRenewalInterval(stagingDirPath)
-      sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
+      sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
     }
 
     // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
@@ -713,7 +709,7 @@ private[spark] class Client(
     val appId = newAppResponse.getApplicationId
     val appStagingDir = getAppStagingDir(appId)
     val pySparkArchives =
-      if (sparkConf.getBoolean("spark.yarn.isPython", false)) {
+      if (sparkConf.get(IS_PYTHON_APP)) {
         findPySparkArchives()
       } else {
         Nil
@@ -766,36 +762,33 @@ private[spark] class Client(
 
     // Include driver-specific java options if we are launching a driver
     if (isClusterMode) {
-      val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
-        .orElse(sys.env.get("SPARK_JAVA_OPTS"))
+      val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS"))
       driverOpts.foreach { opts =>
         javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
       }
-      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
+      val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
         sys.props.get("spark.driver.libraryPath")).flatten
       if (libraryPaths.nonEmpty) {
         prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
       }
-      if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
-        logWarning("spark.yarn.am.extraJavaOptions will not take effect in cluster mode")
+      if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
+        logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
       }
     } else {
       // Validate and include yarn am specific java options in yarn-client mode.
-      val amOptsKey = "spark.yarn.am.extraJavaOptions"
-      val amOpts = sparkConf.getOption(amOptsKey)
-      amOpts.foreach { opts =>
+      sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
         if (opts.contains("-Dspark")) {
-          val msg = s"$amOptsKey is not allowed to set Spark options (was '$opts'). "
+          val msg = s"$${amJavaOptions.key} is not allowed to set Spark options (was '$opts'). "
           throw new SparkException(msg)
         }
         if (opts.contains("-Xmx") || opts.contains("-Xms")) {
-          val msg = s"$amOptsKey is not allowed to alter memory settings (was '$opts')."
+          val msg = s"$${amJavaOptions.key} is not allowed to alter memory settings (was '$opts')."
           throw new SparkException(msg)
         }
         javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
       }
 
-      sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
+      sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
         prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
       }
     }
@@ -883,17 +876,10 @@ private[spark] class Client(
   }
 
   def setupCredentials(): Unit = {
-    loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal")
+    loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key)
     if (loginFromKeytab) {
-      principal =
-        if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal")
-      keytab = {
-        if (args.keytab != null) {
-          args.keytab
-        } else {
-          sparkConf.getOption("spark.yarn.keytab").orNull
-        }
-      }
+      principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get
+      keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull
 
       require(keytab != null, "Keytab must be specified when principal is specified.")
       logInfo("Attempting to login to the Kerberos" +
@@ -902,8 +888,8 @@ private[spark] class Client(
       // Generate a file name that can be used for the keytab file, that does not conflict
       // with any user file.
       val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      sparkConf.set("spark.yarn.keytab", keytabFileName)
-      sparkConf.set("spark.yarn.principal", principal)
+      sparkConf.set(KEYTAB.key, keytabFileName)
+      sparkConf.set(PRINCIPAL.key, principal)
     }
     credentials = UserGroupInformation.getCurrentUser.getCredentials
   }
@@ -923,7 +909,7 @@ private[spark] class Client(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
       logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+    val interval = sparkConf.get(REPORT_INTERVAL)
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
@@ -1071,14 +1057,14 @@ object Client extends Logging {
     val args = new ClientArguments(argStrings, sparkConf)
     // to maintain backwards-compatibility
     if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
-      sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
+      sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors)
     }
     new Client(args, sparkConf).run()
   }
 
   // Alias for the Spark assembly jar and the user jar
-  val SPARK_JAR: String = "__spark__.jar"
-  val APP_JAR: String = "__app__.jar"
+  val SPARK_JAR_NAME: String = "__spark__.jar"
+  val APP_JAR_NAME: String = "__app__.jar"
 
   // URI scheme that identifies local resources
   val LOCAL_SCHEME = "local"
@@ -1087,20 +1073,8 @@ object Client extends Logging {
   val SPARK_STAGING: String = ".sparkStaging"
 
   // Location of any user-defined Spark jars
-  val CONF_SPARK_JAR = "spark.yarn.jar"
   val ENV_SPARK_JAR = "SPARK_JAR"
 
-  // Internal config to propagate the location of the user's jar to the driver/executors
-  val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
-
-  // Internal config to propagate the locations of any extra jars to add to the classpath
-  // of the executors
-  val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
-
-  // Comma-separated list of strings to pass through as YARN application tags appearing
-  // in YARN ApplicationReports, which can be used for filtering when querying YARN.
-  val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
-
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission =
     FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
@@ -1125,23 +1099,23 @@ object Client extends Logging {
    * Find the user-defined Spark jar if configured, or return the jar containing this
    * class if not.
    *
-   * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
+   * This method first looks in the SparkConf object for the spark.yarn.jar key, and in the
    * user environment if that is not found (for backwards compatibility).
    */
   private def sparkJar(conf: SparkConf): String = {
-    if (conf.contains(CONF_SPARK_JAR)) {
-      conf.get(CONF_SPARK_JAR)
-    } else if (System.getenv(ENV_SPARK_JAR) != null) {
-      logWarning(
-        s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
-          s"in favor of the $CONF_SPARK_JAR configuration variable.")
-      System.getenv(ENV_SPARK_JAR)
-    } else {
-      SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
-        + "find jar containing Spark classes. The jar can be defined using the "
-        + "spark.yarn.jar configuration option. If testing Spark, either set that option or "
-        + "make sure SPARK_PREPEND_CLASSES is not set."))
-    }
+    conf.get(SPARK_JAR).getOrElse(
+      if (System.getenv(ENV_SPARK_JAR) != null) {
+        logWarning(
+          s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
+            s"in favor of the ${SPARK_JAR.key} configuration variable.")
+        System.getenv(ENV_SPARK_JAR)
+      } else {
+        SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
+          + "find jar containing Spark classes. The jar can be defined using the "
+          + s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option "
+          + "or make sure SPARK_PREPEND_CLASSES is not set."))
+      }
+    )
   }
 
   /**
@@ -1240,7 +1214,7 @@ object Client extends Logging {
           LOCALIZED_CONF_DIR, env)
     }
 
-    if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
+    if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
       // in order to properly add the app jar when user classpath is first
       // we have to do the mainJar separate in order to send the right thing
       // into addFileToClasspath
@@ -1248,21 +1222,21 @@ object Client extends Logging {
         if (args != null) {
           getMainJarUri(Option(args.userJar))
         } else {
-          getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
+          getMainJarUri(sparkConf.get(APP_JAR))
         }
-      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
+      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR_NAME, env))
 
       val secondaryJars =
         if (args != null) {
-          getSecondaryJarUris(Option(args.addJars))
+          getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq))
         } else {
-          getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+          getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
         }
       secondaryJars.foreach { x =>
         addFileToClasspath(sparkConf, conf, x, null, env)
       }
     }
-    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env)
     populateHadoopClasspath(conf, env)
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1275,8 +1249,8 @@ object Client extends Logging {
    * @param conf Spark configuration.
    */
   def getUserClasspath(conf: SparkConf): Array[URI] = {
-    val mainUri = getMainJarUri(conf.getOption(CONF_SPARK_USER_JAR))
-    val secondaryUris = getSecondaryJarUris(conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+    val mainUri = getMainJarUri(conf.get(APP_JAR))
+    val secondaryUris = getSecondaryJarUris(conf.get(SECONDARY_JARS))
     (mainUri ++ secondaryUris).toArray
   }
 
@@ -1284,11 +1258,11 @@ object Client extends Logging {
     mainJar.flatMap { path =>
       val uri = Utils.resolveURI(path)
       if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
-    }.orElse(Some(new URI(APP_JAR)))
+    }.orElse(Some(new URI(APP_JAR_NAME)))
   }
 
-  private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = {
-    secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
+  private def getSecondaryJarUris(secondaryJars: Option[Seq[String]]): Seq[URI] = {
+    secondaryJars.getOrElse(Nil).map(new URI(_))
   }
 
   /**
@@ -1345,8 +1319,8 @@ object Client extends Logging {
    * If either config is not available, the input path is returned.
    */
   def getClusterPath(conf: SparkConf, path: String): String = {
-    val localPath = conf.get("spark.yarn.config.gatewayPath", null)
-    val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
+    val localPath = conf.get(GATEWAY_ROOT_PATH)
+    val clusterPath = conf.get(REPLACEMENT_ROOT_PATH)
     if (localPath != null && clusterPath != null) {
       path.replace(localPath, clusterPath)
     } else {
@@ -1405,9 +1379,9 @@ object Client extends Logging {
    */
   def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
     if (isDriver) {
-      conf.getBoolean("spark.driver.userClassPathFirst", false)
+      conf.get(DRIVER_USER_CLASS_PATH_FIRST)
     } else {
-      conf.getBoolean("spark.executor.userClassPathFirst", false)
+      conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
     }
   }
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a9f43743573561a83e642e24f15caa8c689a7f21..47b4cc300907b1140a8afaa2b81ce2267d46fd1b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,10 +21,15 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.util.{IntParam, MemoryParam, Utils}
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
+private[spark] class ClientArguments(
+    args: Array[String],
+    sparkConf: SparkConf) {
+
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -37,9 +42,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var executorMemory = 1024 // MB
   var executorCores = 1
   var numExecutors = DEFAULT_NUMBER_EXECUTORS
-  var amQueue = sparkConf.get("spark.yarn.queue", "default")
-  var amMemory: Int = 512 // MB
-  var amCores: Int = 1
+  var amQueue = sparkConf.get(QUEUE_NAME)
+  var amMemory: Int = _
+  var amCores: Int = _
   var appName: String = "Spark"
   var priority = 0
   var principal: String = null
@@ -48,11 +53,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
 
   private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB
   private var driverCores: Int = 1
-  private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
-  private val amMemKey = "spark.yarn.am.memory"
-  private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
-  private val driverCoresKey = "spark.driver.cores"
-  private val amCoresKey = "spark.yarn.am.cores"
   private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
 
   parseArgs(args.toList)
@@ -60,33 +60,33 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   validateArgs()
 
   // Additional memory to allocate to containers
-  val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey
-  val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf,
-    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+  val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+  val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
 
-  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+  val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
 
   /** Load any default arguments provided through environment variables and Spark properties. */
   private def loadEnvironmentArgs(): Unit = {
     // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
     // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
     files = Option(files)
-      .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
+      .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
       .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
       .orNull
     archives = Option(archives)
-      .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
+      .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orNull
     // If dynamic allocation is enabled, start at the configured initial number of executors.
     // Default to minExecutors if no initialExecutors is set.
     numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
     principal = Option(principal)
-      .orElse(sparkConf.getOption("spark.yarn.principal"))
+      .orElse(sparkConf.get(PRINCIPAL))
       .orNull
     keytab = Option(keytab)
-      .orElse(sparkConf.getOption("spark.yarn.keytab"))
+      .orElse(sparkConf.get(KEYTAB))
       .orNull
   }
 
@@ -103,13 +103,12 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
            |${getUsageMessage()}
          """.stripMargin)
     }
-    if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) {
-      throw new SparkException("Executor cores must not be less than " +
-        "spark.task.cpus.")
+    if (executorCores < sparkConf.get(CPUS_PER_TASK)) {
+      throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.")
     }
     // scalastyle:off println
     if (isClusterMode) {
-      for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
+      for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) {
         if (sparkConf.contains(key)) {
           println(s"$key is set but does not apply in cluster mode.")
         }
@@ -117,17 +116,13 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       amMemory = driverMemory
       amCores = driverCores
     } else {
-      for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+      for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) {
         if (sparkConf.contains(key)) {
           println(s"$key is set but does not apply in client mode.")
         }
       }
-      sparkConf.getOption(amMemKey)
-        .map(Utils.memoryStringToMb)
-        .foreach { mem => amMemory = mem }
-      sparkConf.getOption(amCoresKey)
-        .map(_.toInt)
-        .foreach { cores => amCores = cores }
+      amMemory = sparkConf.get(AM_MEMORY).toInt
+      amCores = sparkConf.get(AM_CORES)
     }
     // scalastyle:on println
   }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 6474acc3dc9d2f8a5b31b76574a4189ba462f3fe..1ae278d76f027b613c05a724906c849f18c7ea31 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class ExecutorDelegationTokenUpdater(
@@ -34,7 +35,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
 
   @volatile private var lastCredentialsFileSuffix = 0
 
-  private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
   private val freshHadoopConf =
     SparkHadoopUtil.get.getConfBypassingFSCache(
       hadoopConf, new Path(credentialsFile).toUri.getScheme)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 21ac04dc76c32229746ed024fff68f1f9792fac4..9f91d182ebc3275d585a4a482e2a84b02a636a4f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
 
-class ExecutorRunnable(
+private[yarn] class ExecutorRunnable(
     container: Container,
     conf: Configuration,
     sparkConf: SparkConf,
@@ -104,7 +106,7 @@ class ExecutorRunnable(
     // If external shuffle service is enabled, register with the Yarn shuffle service already
     // started on the NodeManager and, if authentication is enabled, provide it with our secret
     // key for fetching shuffle files later
-    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+    if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
       val secretString = securityMgr.getSecretKey()
       val secretBytes =
         if (secretString != null) {
@@ -148,13 +150,13 @@ class ExecutorRunnable(
     javaOpts += "-Xmx" + executorMemoryString
 
     // Set extra Java options for the executor, if defined
-    sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+    sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
       javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
     }
     sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
       javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
     }
-    sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+    sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
       prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
     }
 
@@ -286,8 +288,8 @@ class ExecutorRunnable(
 
   private def prepareEnvironment(container: Container): HashMap[String, String] = {
     val env = new HashMap[String, String]()
-    val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
-    Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
+    Client.populateClasspath(null, yarnConf, sparkConf, env, false,
+      sparkConf.get(EXECUTOR_CLASS_PATH))
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
       // This assumes each executor environment variable set here is a path
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 2ec189de7c91449093b41661ddfde32ee0ad3060..8772e26f4314d8e3a6c9fca8d0fc16d5c094cf34 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
 
 private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
 
@@ -84,9 +85,6 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
     val yarnConf: Configuration,
     val resource: Resource) {
 
-  // Number of CPUs per task
-  private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
-
   /**
    * Calculate each container's node locality and rack locality
    * @param numContainer number of containers to calculate
@@ -159,7 +157,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
    */
   private def numExecutorsPending(numTasksPending: Int): Int = {
     val coresPerExecutor = resource.getVirtualCores
-    (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+    (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
   }
 
   /**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 11426eb07c7edecc42153fede41383f2dc218c67..a96cb4957be88e1a09a3447acd9774b2f78bd520 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,6 +34,7 @@ import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -107,21 +108,20 @@ private[yarn] class YarnAllocator(
   // Executor memory in MB.
   protected val executorMemory = args.executorMemory
   // Additional memory overhead.
-  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+  protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
   // Number of cores per executor.
   protected val executorCores = args.executorCores
   // Resource capability requested for each executors
   private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
 
   private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
-    "ContainerLauncher",
-    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
+    "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
 
   // For testing
   private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
 
-  private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
+  private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
   // ContainerRequest constructor that can take a node label expression. We grab it through
   // reflection because it's only available in later versions of YARN.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 98505b93dda36a57c2b4266df488e7fb64e904bd..968f63527616abee0262c334061750f336895e65 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -117,7 +118,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
 
   /** Returns the maximum number of attempts to register the AM. */
   def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
-    val sparkMaxAttempts = sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt)
+    val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
     val yarnMaxAttempts = yarnConf.getInt(
       YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
     val retval: Int = sparkMaxAttempts match {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index aef78fdfd4c570641ecf88742cba0a7f57879ff3..ed56d4bd44fe8e2ed0c1798adf8e2190369073e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
 import org.apache.spark.util.Utils
 
@@ -97,10 +99,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
    * Get the list of namenodes the user may access.
    */
   def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get("spark.yarn.access.namenodes", "")
-      .split(",")
-      .map(_.trim())
-      .filter(!_.isEmpty)
+    sparkConf.get(NAMENODES_TO_ACCESS)
       .map(new Path(_))
       .toSet
   }
@@ -335,7 +334,7 @@ object YarnSparkHadoopUtil {
   // the common cases. Memory overhead tends to grow with container size.
 
   val MEMORY_OVERHEAD_FACTOR = 0.10
-  val MEMORY_OVERHEAD_MIN = 384
+  val MEMORY_OVERHEAD_MIN = 384L
 
   val ANY_HOST = "*"
 
@@ -509,10 +508,9 @@ object YarnSparkHadoopUtil {
       conf: SparkConf,
       numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
     if (Utils.isDynamicAllocationEnabled(conf)) {
-      val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
-      val initialNumExecutors =
-        conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
-      val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
+      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+      val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
+      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
       require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
         s"initial executor number $initialNumExecutors must between min executor number" +
           s"$minNumExecutors and max executor number $maxNumExecutors")
@@ -522,7 +520,7 @@ object YarnSparkHadoopUtil {
       val targetNumExecutors =
         sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
       // System property can override environment variable.
-      conf.getInt("spark.executor.instances", targetNumExecutors)
+      conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)
     }
   }
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
new file mode 100644
index 0000000000000000000000000000000000000000..06c1be9bf0e07f58e23c81f7314743a5cd0ed4e7
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+package object config {
+
+  /* Common app configuration. */
+
+  private[spark] val APPLICATION_TAGS = ConfigBuilder("spark.yarn.tags")
+    .doc("Comma-separated list of strings to pass through as YARN application tags appearing " +
+      "in YARN Application Reports, which can be used for filtering when querying YARN.")
+    .stringConf
+    .toSequence
+    .optional
+
+  private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+    ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
+      .doc("Interval after which AM failures will be considered independent and " +
+        "not accumulate towards the attempt count.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .optional
+
+  private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
+    .doc("Maximum number of AM attempts before failing the app.")
+    .intConf
+    .optional
+
+  private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first")
+    .doc("Whether to place user jars in front of Spark's classpath.")
+    .booleanConf
+    .withDefault(false)
+
+  private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath")
+    .doc("Root of configuration paths that is present on gateway nodes, and will be replaced " +
+      "with the corresponding path in cluster machines.")
+    .stringConf
+    .withDefault(null)
+
+  private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath")
+    .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " +
+      "in the YARN cluster.")
+    .stringConf
+    .withDefault(null)
+
+  private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
+    .stringConf
+    .withDefault("default")
+
+  private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address")
+    .stringConf
+    .optional
+
+  /* File distribution. */
+
+  private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
+    .doc("Location of the Spark jar to use.")
+    .stringConf
+    .optional
+
+  private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
+    .stringConf
+    .optional
+
+  private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
+    .stringConf
+    .optional
+
+  private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
+    .doc("Whether to preserve temporary files created by the job in HDFS.")
+    .booleanConf
+    .withDefault(false)
+
+  private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication")
+    .doc("Replication factor for files uploaded by Spark to HDFS.")
+    .intConf
+    .optional
+
+  /* Cluster-mode launcher configuration. */
+
+  private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
+    .doc("In cluster mode, whether to wait for the application to finishe before exiting the " +
+      "launcher process.")
+    .booleanConf
+    .withDefault(true)
+
+  private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
+    .doc("Interval between reports of the current app status in cluster mode.")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .withDefaultString("1s")
+
+  /* Shared Client-mode AM / Driver configuration. */
+
+  private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .withDefaultString("100s")
+
+  private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
+    .doc("Node label expression for the AM.")
+    .stringConf
+    .optional
+
+  private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
+    ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
+      .intConf
+      .withDefault(25)
+
+  private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures")
+    .intConf
+    .optional
+
+  private[spark] val MAX_REPORTER_THREAD_FAILURES =
+    ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
+      .intConf
+      .withDefault(5)
+
+  private[spark] val RM_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .withDefaultString("3s")
+
+  private[spark] val INITIAL_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .withDefaultString("200ms")
+
+  private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
+    .doc("A comma-separated list of class names of services to add to the scheduler.")
+    .stringConf
+    .toSequence
+    .withDefault(Nil)
+
+  /* Client-mode AM configuration. */
+
+  private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
+    .intConf
+    .withDefault(1)
+
+  private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions")
+    .doc("Extra Java options for the client-mode AM.")
+    .stringConf
+    .optional
+
+  private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath")
+    .doc("Extra native library path for the client-mode AM.")
+    .stringConf
+    .optional
+
+  private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .optional
+
+  private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
+    .bytesConf(ByteUnit.MiB)
+    .withDefaultString("512m")
+
+  /* Driver configuration. */
+
+  private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
+    .intConf
+    .optional
+
+  private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .optional
+
+  /* Executor configuration. */
+
+  private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
+    .bytesConf(ByteUnit.MiB)
+    .optional
+
+  private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
+    ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
+      .doc("Node label expression for executors.")
+      .stringConf
+      .optional
+
+  /* Security configuration. */
+
+  private[spark] val CREDENTIAL_FILE_MAX_COUNT =
+    ConfigBuilder("spark.yarn.credentials.file.retention.count")
+      .intConf
+      .withDefault(5)
+
+  private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
+    ConfigBuilder("spark.yarn.credentials.file.retention.days")
+      .intConf
+      .withDefault(5)
+
+  private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
+    .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
+      "fs.defaultFS does not need to be listed here.")
+    .stringConf
+    .toSequence
+    .withDefault(Nil)
+
+  private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
+    .internal
+    .timeConf(TimeUnit.MILLISECONDS)
+    .optional
+
+  /* Private configs. */
+
+  private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
+    .internal
+    .stringConf
+    .withDefault(null)
+
+  // Internal config to propagate the location of the user's jar to the driver/executors
+  private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
+    .internal
+    .stringConf
+    .optional
+
+  // Internal config to propagate the locations of any extra jars to add to the classpath
+  // of the executors
+  private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars")
+    .internal
+    .stringConf
+    .toSequence
+    .optional
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
index c064521845399fca57682afe638dd7b9db505a96..c4757e335b6c6a9af11c38b6d4ab4851b4afb8d8 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
 
 import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -103,20 +104,15 @@ private[spark] class SchedulerExtensionServices extends SchedulerExtensionServic
     val attemptId = binding.attemptId
     logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
 
-    serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
-    services = serviceOption
-      .map { s =>
-        s.split(",").map(_.trim()).filter(!_.isEmpty)
-          .map { sClass =>
-            val instance = Utils.classForName(sClass)
-              .newInstance()
-              .asInstanceOf[SchedulerExtensionService]
-            // bind this service
-            instance.start(binding)
-            logInfo(s"Service $sClass started")
-            instance
-          }.toList
-      }.getOrElse(Nil)
+    services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
+      val instance = Utils.classForName(sClass)
+        .newInstance()
+        .asInstanceOf[SchedulerExtensionService]
+      // bind this service
+      instance.start(binding)
+      logInfo(s"Service $sClass started")
+      instance
+    }.toList
   }
 
   /**
@@ -144,11 +140,3 @@ private[spark] class SchedulerExtensionServices extends SchedulerExtensionServic
     | services=$services,
     | started=$started)""".stripMargin
 }
-
-private[spark] object SchedulerExtensionServices {
-
-  /**
-   * A list of comma separated services to instantiate in the scheduler
-   */
-  val SPARK_YARN_SERVICES = "spark.yarn.services"
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 19065373c6d5565a788df97d18d6bf9a60773205..b57c179d89bd2560bf6fe55d5147b09cec8fc7e9 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -41,6 +41,7 @@ import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterAll, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
 class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
@@ -103,8 +104,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
 
   test("Local jar URIs") {
     val conf = new Configuration()
-    val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
-      .set("spark.yarn.user.classpath.first", "true")
+    val sparkConf = new SparkConf()
+      .set(SPARK_JAR, SPARK)
+      .set(USER_CLASS_PATH_FIRST, true)
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
 
@@ -129,13 +131,13 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
       }
     cp should contain(pwdVar)
     cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
-    cp should not contain (Client.SPARK_JAR)
-    cp should not contain (Client.APP_JAR)
+    cp should not contain (Client.SPARK_JAR_NAME)
+    cp should not contain (Client.APP_JAR_NAME)
   }
 
   test("Jar path propagation through SparkConf") {
     val conf = new Configuration()
-    val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+    val sparkConf = new SparkConf().set(SPARK_JAR, SPARK)
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
 
     val client = spy(new Client(args, conf, sparkConf))
@@ -145,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val tempDir = Utils.createTempDir()
     try {
       client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
-      sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
+      sparkConf.get(APP_JAR) should be (Some(USER))
 
       // The non-local path should be propagated by name only, since it will end up in the app's
       // staging dir.
@@ -160,7 +162,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
         })
         .mkString(",")
 
-      sparkConf.getOption(Client.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
+      sparkConf.get(SECONDARY_JARS) should be (Some(expected.split(",").toSeq))
     } finally {
       Utils.deleteRecursively(tempDir)
     }
@@ -169,9 +171,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
   test("Cluster path translation") {
     val conf = new Configuration()
     val sparkConf = new SparkConf()
-      .set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
-      .set("spark.yarn.config.gatewayPath", "/localPath")
-      .set("spark.yarn.config.replacementPath", "/remotePath")
+      .set(SPARK_JAR.key, "local:/localPath/spark.jar")
+      .set(GATEWAY_ROOT_PATH, "/localPath")
+      .set(REPLACEMENT_ROOT_PATH, "/remotePath")
 
     Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
     Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
@@ -191,8 +193,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     // Spaces between non-comma strings should be preserved as single tags. Empty strings may or
     // may not be removed depending on the version of Hadoop being used.
     val sparkConf = new SparkConf()
-      .set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
-      .set("spark.yarn.maxAppAttempts", "42")
+      .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
+      .set(MAX_APP_ATTEMPTS, 42)
     val args = new ClientArguments(Array(
       "--name", "foo-test-app",
       "--queue", "staging-queue"), sparkConf)
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
index b4d1b0a3d22a7735830aaaca754170e2d1705c1e..338fbe2ef47fde2d5bd5068efba3d802dadb1ab2 100644
--- a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{LocalSparkContext, Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
 
 /**
  * Test the integration with [[SchedulerExtensionServices]]
@@ -36,8 +37,7 @@ class ExtensionServiceIntegrationSuite extends SparkFunSuite
    */
   before {
     val sparkConf = new SparkConf()
-    sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
-      classOf[SimpleExtensionService].getName())
+    sparkConf.set(SCHEDULER_SERVICES, Seq(classOf[SimpleExtensionService].getName()))
     sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
     sc = new SparkContext(sparkConf)
   }