diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 770b43697a176d653c4f7c3a03322d431c9cb578..5d50e3851a9f0f01feca643005bd1d23510902b0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -85,10 +85,12 @@ private[spark] class TypedConfigBuilder[T](
     this(parent, converter, Option(_).map(_.toString).orNull)
   }
 
+  /** Apply a transformation to the user-provided values of the config entry. */
   def transform(fn: T => T): TypedConfigBuilder[T] = {
     new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
   }
 
+  /** Check that user-provided values for the config match a pre-defined set. */
   def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
     transform { v =>
       if (!validValues.contains(v)) {
@@ -99,30 +101,38 @@ private[spark] class TypedConfigBuilder[T](
     }
   }
 
+  /** Turns the config entry into a sequence of values of the underlying type. */
   def toSequence: TypedConfigBuilder[Seq[T]] = {
     new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
   }
 
-  /** Creates a [[ConfigEntry]] that does not require a default value. */
-  def optional: OptionalConfigEntry[T] = {
-    new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+  /** Creates a [[ConfigEntry]] that does not have a default value. */
+  def createOptional: OptionalConfigEntry[T] = {
+    val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
+      parent._public)
+    parent._onCreate.foreach(_(entry))
+    entry
   }
 
   /** Creates a [[ConfigEntry]] that has a default value. */
-  def withDefault(default: T): ConfigEntry[T] = {
+  def createWithDefault(default: T): ConfigEntry[T] = {
     val transformedDefault = converter(stringConverter(default))
-    new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
-      parent._doc, parent._public)
+    val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
+      stringConverter, parent._doc, parent._public)
+    parent._onCreate.foreach(_(entry))
+    entry
   }
 
   /**
    * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
    * [[String]] and must be a valid value for the entry.
    */
-  def withDefaultString(default: String): ConfigEntry[T] = {
+  def createWithDefaultString(default: String): ConfigEntry[T] = {
     val typedDefault = converter(default)
-    new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
-      parent._public)
+    val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
+      parent._doc, parent._public)
+    parent._onCreate.foreach(_(entry))
+    entry
   }
 
 }
@@ -136,10 +146,11 @@ private[spark] case class ConfigBuilder(key: String) {
 
   import ConfigHelpers._
 
-  var _public = true
-  var _doc = ""
+  private[config] var _public = true
+  private[config] var _doc = ""
+  private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
 
-  def internal: ConfigBuilder = {
+  def internal(): ConfigBuilder = {
     _public = false
     this
   }
@@ -149,6 +160,15 @@ private[spark] case class ConfigBuilder(key: String) {
     this
   }
 
+  /**
+   * Registers a callback for when the config entry is finally instantiated. Currently used by
+   * SQLConf to keep track of SQL configuration entries.
+   */
+  def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
+    _onCreate = Option(callback)
+    this
+  }
+
   def intConf: TypedConfigBuilder[Int] = {
     new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
   }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 968c5192ac676042a7574477c9a8bd11942d4835..94b50ee06520c7807d9782b711de4954c1929974 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -23,68 +23,70 @@ import org.apache.spark.network.util.ByteUnit
 package object config {
 
   private[spark] val DRIVER_CLASS_PATH =
-    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
 
   private[spark] val DRIVER_JAVA_OPTIONS =
-    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
 
   private[spark] val DRIVER_LIBRARY_PATH =
-    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
 
   private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
-    ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+    ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
 
   private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
     .bytesConf(ByteUnit.MiB)
-    .withDefaultString("1g")
+    .createWithDefaultString("1g")
 
   private[spark] val EXECUTOR_CLASS_PATH =
-    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
   private[spark] val EXECUTOR_JAVA_OPTIONS =
-    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
 
   private[spark] val EXECUTOR_LIBRARY_PATH =
-    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional
 
   private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
-    ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+    ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
 
   private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
     .bytesConf(ByteUnit.MiB)
-    .withDefaultString("1g")
+    .createWithDefaultString("1g")
 
-  private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
-    .booleanConf.withDefault(false)
+  private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
+    .booleanConf.createWithDefault(false)
 
-  private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+  private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
 
   private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
-    ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+    ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
 
   private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
     ConfigBuilder("spark.dynamicAllocation.initialExecutors")
       .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
 
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
-    ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+    ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
   private[spark] val SHUFFLE_SERVICE_ENABLED =
-    ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+    ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
 
   private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
     .doc("Location of user's keytab.")
-    .stringConf.optional
+    .stringConf.createOptional
 
   private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
     .doc("Name of the Kerberos principal.")
-    .stringConf.optional
+    .stringConf.createOptional
 
-  private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+  private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
+    .intConf
+    .createOptional
 
   private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
-    .internal
+    .internal()
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 }
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 0644148eaea5699bd6e96d1ebfd2e13ddc866373..337fd7e85e81cf5900503a793e24b1ead76b000f 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -26,7 +26,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: int") {
     val conf = new SparkConf()
-    val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+    val iConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
     assert(conf.get(iConf) === 1)
     conf.set(iConf, 2)
     assert(conf.get(iConf) === 2)
@@ -34,21 +34,21 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: long") {
     val conf = new SparkConf()
-    val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+    val lConf = ConfigBuilder("spark.long").longConf.createWithDefault(0L)
     conf.set(lConf, 1234L)
     assert(conf.get(lConf) === 1234L)
   }
 
   test("conf entry: double") {
     val conf = new SparkConf()
-    val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+    val dConf = ConfigBuilder("spark.double").doubleConf.createWithDefault(0.0)
     conf.set(dConf, 20.0)
     assert(conf.get(dConf) === 20.0)
   }
 
   test("conf entry: boolean") {
     val conf = new SparkConf()
-    val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+    val bConf = ConfigBuilder("spark.boolean").booleanConf.createWithDefault(false)
     assert(!conf.get(bConf))
     conf.set(bConf, true)
     assert(conf.get(bConf))
@@ -56,7 +56,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: optional") {
     val conf = new SparkConf()
-    val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+    val optionalConf = ConfigBuilder("spark.optional").intConf.createOptional
     assert(conf.get(optionalConf) === None)
     conf.set(optionalConf, 1)
     assert(conf.get(optionalConf) === Some(1))
@@ -64,7 +64,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: fallback") {
     val conf = new SparkConf()
-    val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+    val parentConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
     val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
     assert(conf.get(confWithFallback) === 1)
     conf.set(confWithFallback, 2)
@@ -74,7 +74,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: time") {
     val conf = new SparkConf()
-    val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+    val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).createWithDefaultString("1h")
     assert(conf.get(time) === 3600L)
     conf.set(time.key, "1m")
     assert(conf.get(time) === 60L)
@@ -82,7 +82,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: bytes") {
     val conf = new SparkConf()
-    val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+    val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).createWithDefaultString("1m")
     assert(conf.get(bytes) === 1024L)
     conf.set(bytes.key, "1k")
     assert(conf.get(bytes) === 1L)
@@ -90,7 +90,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: string seq") {
     val conf = new SparkConf()
-    val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+    val seq = ConfigBuilder("spark.seq").stringConf.toSequence.createWithDefault(Seq())
     conf.set(seq.key, "1,,2, 3 , , 4")
     assert(conf.get(seq) === Seq("1", "2", "3", "4"))
     conf.set(seq, Seq("1", "2"))
@@ -99,7 +99,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: int seq") {
     val conf = new SparkConf()
-    val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+    val seq = ConfigBuilder("spark.seq").intConf.toSequence.createWithDefault(Seq())
     conf.set(seq.key, "1,,2, 3 , , 4")
     assert(conf.get(seq) === Seq(1, 2, 3, 4))
     conf.set(seq, Seq(1, 2))
@@ -111,7 +111,7 @@ class ConfigEntrySuite extends SparkFunSuite {
     val transformationConf = ConfigBuilder("spark.transformation")
       .stringConf
       .transform(_.toLowerCase())
-      .withDefault("FOO")
+      .createWithDefault("FOO")
 
     assert(conf.get(transformationConf) === "foo")
     conf.set(transformationConf, "BAR")
@@ -123,7 +123,7 @@ class ConfigEntrySuite extends SparkFunSuite {
     val enum = ConfigBuilder("spark.enum")
       .stringConf
       .checkValues(Set("a", "b", "c"))
-      .withDefault("a")
+      .createWithDefault("a")
     assert(conf.get(enum) === "a")
 
     conf.set(enum, "b")
@@ -138,7 +138,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("conf entry: conversion error") {
     val conf = new SparkConf()
-    val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+    val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.createOptional
     conf.set(conversionTest.key, "abc")
     val conversionError = intercept[IllegalArgumentException] {
       conf.get(conversionTest)
@@ -148,7 +148,7 @@ class ConfigEntrySuite extends SparkFunSuite {
 
   test("default value handling is null-safe") {
     val conf = new SparkConf()
-    val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+    val stringConf = ConfigBuilder("spark.string").stringConf.createWithDefault(null)
     assert(conf.get(stringConf) === null)
   }
 
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b32480b1646bffec231a5b671fd79b62f6fa10cc..60124ef0a13bc7a21485fdb3d3761afae76c0f95 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -20,6 +20,7 @@ import java.nio.file.Files
 
 import scala.util.Properties
 import scala.collection.JavaConverters._
+import scala.collection.mutable.Stack
 
 import sbt._
 import sbt.Classpaths.publishTask
@@ -742,8 +743,21 @@ object TestSettings {
     parallelExecution in Test := false,
     // Make sure the test temp directory exists.
     resourceGenerators in Test <+= resourceManaged in Test map { outDir: File =>
-      if (!new File(testTempDir).isDirectory()) {
-        require(new File(testTempDir).mkdirs(), s"Error creating temp directory $testTempDir.")
+      var dir = new File(testTempDir)
+      if (!dir.isDirectory()) {
+        // Because File.mkdirs() can fail if multiple callers are trying to create the same
+        // parent directory, this code tries to create parents one at a time, and avoids
+        // failures when the directories have been created by somebody else.
+        val stack = new Stack[File]()
+        while (!dir.isDirectory()) {
+          stack.push(dir)
+          dir = dir.getParentFile()
+        }
+
+        while (stack.nonEmpty) {
+          val d = stack.pop()
+          require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d")
+        }
       }
       Seq[File]()
     },
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 587ba1ea058a2d013478c0ae73cc9612165eb095..1c9cb79ba4a5e0256bf5f556b073e32cf8647e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst._
@@ -41,7 +42,6 @@ import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -138,7 +138,7 @@ class SQLContext private[sql](
   def setConf(props: Properties): Unit = conf.setConf(props)
 
   /** Set the given Spark SQL configuration property. */
-  private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+  private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
 
   /**
    * Set the given Spark SQL configuration property.
@@ -158,16 +158,16 @@ class SQLContext private[sql](
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
-   * yet, return `defaultValue` in [[SQLConfEntry]].
+   * yet, return `defaultValue` in [[ConfigEntry]].
    */
-  private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+  private[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
-   * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+   * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
    * desired one.
    */
-  private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+  private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
     conf.getConf(entry, defaultValue)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a7c0be63fcc3a19d9c2363f6be4a4a9ce7ced50d..927af8994959b532212bcba6b6af5c95b29ab351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -25,6 +25,8 @@ import scala.collection.immutable
 import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.util.Utils
 
@@ -36,418 +38,305 @@ import org.apache.spark.util.Utils
 object SQLConf {
 
   private val sqlConfEntries = java.util.Collections.synchronizedMap(
-    new java.util.HashMap[String, SQLConfEntry[_]]())
+    new java.util.HashMap[String, ConfigEntry[_]]())
 
-  /**
-   * An entry contains all meta information for a configuration.
-   *
-   * @param key the key for the configuration
-   * @param defaultValue the default value for the configuration
-   * @param valueConverter how to convert a string to the value. It should throw an exception if the
-   *                       string does not have the required format.
-   * @param stringConverter how to convert a value to a string that the user can use it as a valid
-   *                        string value. It's usually `toString`. But sometimes, a custom converter
-   *                        is necessary. E.g., if T is List[String], `a, b, c` is better than
-   *                        `List(a, b, c)`.
-   * @param doc the document for the configuration
-   * @param isPublic if this configuration is public to the user. If it's `false`, this
-   *                 configuration is only used internally and we should not expose it to the user.
-   * @tparam T the value type
-   */
-  class SQLConfEntry[T] private(
-      val key: String,
-      val defaultValue: Option[T],
-      val valueConverter: String => T,
-      val stringConverter: T => String,
-      val doc: String,
-      val isPublic: Boolean) {
-
-    def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
-
-    override def toString: String = {
-      s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
-    }
+  private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
+    require(!sqlConfEntries.containsKey(entry.key),
+      s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
+    sqlConfEntries.put(entry.key, entry)
   }
 
-  object SQLConfEntry {
-
-    private def apply[T](
-          key: String,
-          defaultValue: Option[T],
-          valueConverter: String => T,
-          stringConverter: T => String,
-          doc: String,
-          isPublic: Boolean): SQLConfEntry[T] =
-      sqlConfEntries.synchronized {
-        if (sqlConfEntries.containsKey(key)) {
-          throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
-        }
-        val entry =
-          new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
-        sqlConfEntries.put(key, entry)
-        entry
-      }
-
-    def intConf(
-          key: String,
-          defaultValue: Option[Int] = None,
-          doc: String = "",
-          isPublic: Boolean = true): SQLConfEntry[Int] =
-      SQLConfEntry(key, defaultValue, { v =>
-        try {
-          v.toInt
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalArgumentException(s"$key should be int, but was $v")
-        }
-      }, _.toString, doc, isPublic)
-
-    def longConf(
-        key: String,
-        defaultValue: Option[Long] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Long] =
-      SQLConfEntry(key, defaultValue, { v =>
-        try {
-          v.toLong
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalArgumentException(s"$key should be long, but was $v")
-        }
-      }, _.toString, doc, isPublic)
-
-    def longMemConf(
-        key: String,
-        defaultValue: Option[Long] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Long] =
-      SQLConfEntry(key, defaultValue, { v =>
-        try {
-          v.toLong
-        } catch {
-          case _: NumberFormatException =>
-            try {
-              Utils.byteStringAsBytes(v)
-            } catch {
-              case _: NumberFormatException =>
-                throw new IllegalArgumentException(s"$key should be long, but was $v")
-            }
-        }
-      }, _.toString, doc, isPublic)
-
-    def doubleConf(
-        key: String,
-        defaultValue: Option[Double] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Double] =
-      SQLConfEntry(key, defaultValue, { v =>
-        try {
-          v.toDouble
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalArgumentException(s"$key should be double, but was $v")
-        }
-      }, _.toString, doc, isPublic)
-
-    def booleanConf(
-        key: String,
-        defaultValue: Option[Boolean] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Boolean] =
-      SQLConfEntry(key, defaultValue, { v =>
-        try {
-          v.toBoolean
-        } catch {
-          case _: IllegalArgumentException =>
-            throw new IllegalArgumentException(s"$key should be boolean, but was $v")
-        }
-      }, _.toString, doc, isPublic)
-
-    def stringConf(
-        key: String,
-        defaultValue: Option[String] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[String] =
-      SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
-
-    def enumConf[T](
-        key: String,
-        valueConverter: String => T,
-        validValues: Set[T],
-        defaultValue: Option[T] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[T] =
-      SQLConfEntry(key, defaultValue, v => {
-        val _v = valueConverter(v)
-        if (!validValues.contains(_v)) {
-          throw new IllegalArgumentException(
-            s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
-        }
-        _v
-      }, _.toString, doc, isPublic)
-
-    def seqConf[T](
-        key: String,
-        valueConverter: String => T,
-        defaultValue: Option[Seq[T]] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
-      SQLConfEntry(
-        key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
-    }
+  private[sql] object SQLConfigBuilder {
 
-    def stringSeqConf(
-        key: String,
-        defaultValue: Option[Seq[String]] = None,
-        doc: String = "",
-        isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
-      seqConf(key, s => s, defaultValue, doc, isPublic)
-    }
-  }
+    def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register)
 
-  import SQLConfEntry._
+  }
 
-  val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
-    defaultValue = Some(true),
-    doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
+  val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
+    .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
       "When set to false, only one SQLContext/HiveContext is allowed to be created " +
       "through the constructor (new SQLContexts/HiveContexts created through newSession " +
       "method is allowed). Please note that this conf needs to be set in Spark Conf. Once " +
       "a SQLContext/HiveContext has been created, changing the value of this conf will not " +
-      "have effect.",
-    isPublic = true)
-
-  val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
-    defaultValue = Some(true),
-    doc = "When set to true Spark SQL will automatically select a compression codec for each " +
-      "column based on statistics of the data.",
-    isPublic = false)
-
-  val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
-    defaultValue = Some(10000),
-    doc = "Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
-      "memory utilization and compression, but risk OOMs when caching data.",
-    isPublic = false)
+      "have effect.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed")
+    .internal()
+    .doc("When set to true Spark SQL will automatically select a compression codec for each " +
+      "column based on statistics of the data.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize")
+    .internal()
+    .doc("Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
+      "memory utilization and compression, but risk OOMs when caching data.")
+    .intConf
+    .createWithDefault(10000)
 
   val IN_MEMORY_PARTITION_PRUNING =
-    booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
-      defaultValue = Some(true),
-      doc = "When true, enable partition pruning for in-memory columnar tables.",
-      isPublic = false)
-
-  val PREFER_SORTMERGEJOIN = booleanConf("spark.sql.join.preferSortMergeJoin",
-      defaultValue = Some(true),
-      doc = "When true, prefer sort merge join over shuffle hash join.",
-      isPublic = false)
-
-  val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
-    defaultValue = Some(10 * 1024 * 1024),
-    doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+    SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning")
+      .internal()
+      .doc("When true, enable partition pruning for in-memory columnar tables.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin")
+    .internal()
+    .doc("When true, prefer sort merge join over shuffle hash join.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
+    .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
       "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
       "Note that currently statistics are only supported for Hive Metastore tables where the " +
       "command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+    .intConf
+    .createWithDefault(10 * 1024 * 1024)
 
-  val DEFAULT_SIZE_IN_BYTES = longConf(
-    "spark.sql.defaultSizeInBytes",
-    doc = "The default table size used in query planning. By default, it is set to a larger " +
+  val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
+    .internal()
+    .doc("The default table size used in query planning. By default, it is set to a larger " +
       "value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " +
       "by default the optimizer will not choose to broadcast a table unless it knows for sure " +
-      "its size is small enough.",
-    isPublic = false)
+      "its size is small enough.")
+    .longConf
+    .createWithDefault(-1)
 
-  val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
-    defaultValue = Some(200),
-    doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
+  val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
+    .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
+    .intConf
+    .createWithDefault(200)
 
   val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
-    longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
-      defaultValue = Some(64 * 1024 * 1024),
-      doc = "The target post-shuffle input size in bytes of a task.")
+    SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+      .doc("The target post-shuffle input size in bytes of a task.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(64 * 1024 * 1024)
 
-  val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled",
-    defaultValue = Some(false),
-    doc = "When true, enable adaptive query execution.")
+  val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled")
+    .doc("When true, enable adaptive query execution.")
+    .booleanConf
+    .createWithDefault(false)
 
   val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
-    intConf("spark.sql.adaptive.minNumPostShufflePartitions",
-      defaultValue = Some(-1),
-      doc = "The advisory minimal number of post-shuffle partitions provided to " +
+    SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions")
+      .internal()
+      .doc("The advisory minimal number of post-shuffle partitions provided to " +
         "ExchangeCoordinator. This setting is used in our test to make sure we " +
         "have enough parallelism to expose issues that will not be exposed with a " +
         "single partition. When the value is a non-positive value, this setting will " +
-        "not be provided to ExchangeCoordinator.",
-      isPublic = false)
-
-  val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
-    defaultValue = Some(true),
-    doc = "When true, common subexpressions will be eliminated.",
-    isPublic = false)
-
-  val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
-    defaultValue = Some(true),
-    doc = "Whether the query analyzer should be case sensitive or not.")
-
-  val USE_FILE_SCAN = booleanConf("spark.sql.sources.fileScan",
-    defaultValue = Some(true),
-    doc = "Use the new FileScanRDD path for reading HDSF based data sources.",
-    isPublic = false)
-
-  val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
-    defaultValue = Some(false),
-    doc = "When true, the Parquet data source merges schemas collected from all data files, " +
-          "otherwise the schema is picked from the summary file or a random data file " +
-          "if no summary file is available.")
-
-  val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
-    defaultValue = Some(false),
-    doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
-          "summary files and we will ignore them when merging schema. Otherwise, if this is " +
-          "false, which is the default, we will merge all part-files. This should be considered " +
-          "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
-
-  val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
-    defaultValue = Some(false),
-    doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+        "not be provided to ExchangeCoordinator.")
+      .intConf
+      .createWithDefault(-1)
+
+  val SUBEXPRESSION_ELIMINATION_ENABLED =
+    SQLConfigBuilder("spark.sql.subexpressionElimination.enabled")
+      .internal()
+      .doc("When true, common subexpressions will be eliminated.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive")
+    .doc("Whether the query analyzer should be case sensitive or not.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
+    .internal()
+    .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
+    .doc("When true, the Parquet data source merges schemas collected from all data files, " +
+         "otherwise the schema is picked from the summary file or a random data file " +
+         "if no summary file is available.")
+    .booleanConf
+    .createWithDefault(false)
+
+  val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
+    .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
+         "summary files and we will ignore them when merging schema. Otherwise, if this is " +
+         "false, which is the default, we will merge all part-files. This should be considered " +
+         "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
+    .booleanConf
+    .createWithDefault(false)
+
+  val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
+    .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
       "Spark SQL, do not differentiate between binary data and strings when writing out the " +
       "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
       "compatibility with these systems.")
+    .booleanConf
+    .createWithDefault(false)
 
-  val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
-    defaultValue = Some(true),
-    doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+  val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
+    .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
       "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
       "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
       "provide compatibility with these systems.")
+    .booleanConf
+    .createWithDefault(true)
 
-  val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
-    defaultValue = Some(true),
-    doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+  val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
+    .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+    .booleanConf
+    .createWithDefault(true)
 
-  val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
-    valueConverter = v => v.toLowerCase,
-    validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
-    defaultValue = Some("gzip"),
-    doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+  val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
+    .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
       "uncompressed, snappy, gzip, lzo.")
-
-  val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
-    defaultValue = Some(true),
-    doc = "Enables Parquet filter push-down optimization when set to true.")
-
-  val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
-    key = "spark.sql.parquet.writeLegacyFormat",
-    defaultValue = Some(false),
-    doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
+    .stringConf
+    .transform(_.toLowerCase())
+    .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+    .createWithDefault("gzip")
+
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
+    .doc("Enables Parquet filter push-down optimization when set to true.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
+    .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
       "Spark SQL schema and vice versa.")
+    .booleanConf
+    .createWithDefault(false)
 
-  val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
-    key = "spark.sql.parquet.output.committer.class",
-    defaultValue = Some(classOf[ParquetOutputCommitter].getName),
-    doc = "The output committer class used by Parquet. The specified class needs to be a " +
+  val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
+    .doc("The output committer class used by Parquet. The specified class needs to be a " +
       "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass " +
       "of org.apache.parquet.hadoop.ParquetOutputCommitter.  NOTE: 1. Instead of SQLConf, this " +
       "option must be set in Hadoop Configuration.  2. This option overrides " +
       "\"spark.sql.sources.outputCommitterClass\".")
-
-  val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
-    key = "spark.sql.parquet.enableVectorizedReader",
-    defaultValue = Some(true),
-    doc = "Enables vectorized parquet decoding.")
-
-  val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
-    defaultValue = Some(false),
-    doc = "When true, enable filter pushdown for ORC files.")
-
-  val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
-    defaultValue = Some(false),
-    doc = "When true, check all the partition paths under the table\'s root directory " +
-          "when reading data stored in HDFS.")
-
-  val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
-    defaultValue = Some(false),
-    doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
-          "unmatching partitions can be eliminated earlier.")
-
-  val NATIVE_VIEW = booleanConf("spark.sql.nativeView",
-    defaultValue = Some(true),
-    doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands.  " +
-          "Note that this function is experimental and should ony be used when you are using " +
-          "non-hive-compatible tables written by Spark SQL.  The SQL string used to create " +
-          "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
-          "possible, or you may get wrong result.",
-    isPublic = false)
-
-  val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
-    defaultValue = Some(true),
-    doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
-          "CREATE VIEW statement using SQL query string generated from view definition logical " +
-          "plan.  If the logical plan doesn't have a SQL representation, we fallback to the " +
-          "original native view implementation.",
-    isPublic = false)
-
-  val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
-    defaultValue = Some("_corrupt_record"),
-    doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
-
-  val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
-    defaultValue = Some(5 * 60),
-    doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
+    .stringConf
+    .createWithDefault(classOf[ParquetOutputCommitter].getName)
+
+  val PARQUET_VECTORIZED_READER_ENABLED =
+    SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
+      .doc("Enables vectorized parquet decoding.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
+    .doc("When true, enable filter pushdown for ORC files.")
+    .booleanConf
+    .createWithDefault(false)
+
+  val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath")
+    .doc("When true, check all the partition paths under the table\'s root directory " +
+         "when reading data stored in HDFS.")
+    .booleanConf
+    .createWithDefault(false)
+
+  val HIVE_METASTORE_PARTITION_PRUNING =
+    SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
+      .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
+           "unmatching partitions can be eliminated earlier.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
+    .internal()
+    .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands.  " +
+         "Note that this function is experimental and should ony be used when you are using " +
+         "non-hive-compatible tables written by Spark SQL.  The SQL string used to create " +
+         "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+         "possible, or you may get wrong result.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
+    .internal()
+    .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+         "CREATE VIEW statement using SQL query string generated from view definition logical " +
+         "plan.  If the logical plan doesn't have a SQL representation, we fallback to the " +
+         "original native view implementation.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
+    .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
+    .stringConf
+    .createWithDefault("_corrupt_record")
+
+  val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout")
+    .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
+    .intConf
+    .createWithDefault(5 * 60)
 
   // This is only used for the thriftserver
-  val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
-    doc = "Set a Fair Scheduler pool for a JDBC client session.")
-
-  val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
-    defaultValue = Some(200),
-    doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.")
-
-  val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
-    defaultValue = Some(200),
-    doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+  val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool")
+    .doc("Set a Fair Scheduler pool for a JDBC client session.")
+    .stringConf
+    .createOptional
+
+  val THRIFTSERVER_UI_STATEMENT_LIMIT =
+    SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
+      .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
+      .intConf
+      .createWithDefault(200)
+
+  val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions")
+    .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+    .intConf
+    .createWithDefault(200)
 
   // This is used to set the default data source
-  val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
-    defaultValue = Some("org.apache.spark.sql.parquet"),
-    doc = "The default data source to use in input/output.")
+  val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
+    .doc("The default data source to use in input/output.")
+    .stringConf
+    .createWithDefault("org.apache.spark.sql.parquet")
 
   // This is used to control the when we will split a schema's JSON string to multiple pieces
   // in order to fit the JSON string in metastore's table property (by default, the value has
   // a length restriction of 4000 characters). We will split the JSON string of a schema
   // to its length exceeds the threshold.
-  val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
-    defaultValue = Some(4000),
-    doc = "The maximum length allowed in a single cell when " +
-      "storing additional schema information in Hive's metastore.",
-    isPublic = false)
-
-  val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
-    defaultValue = Some(true),
-    doc = "When true, automatically discover data partitions.")
+  val SCHEMA_STRING_LENGTH_THRESHOLD =
+    SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
+      .doc("The maximum length allowed in a single cell when " +
+        "storing additional schema information in Hive's metastore.")
+      .internal()
+      .intConf
+      .createWithDefault(4000)
+
+  val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
+    .doc("When true, automatically discover data partitions.")
+    .booleanConf
+    .createWithDefault(true)
 
   val PARTITION_COLUMN_TYPE_INFERENCE =
-    booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
-      defaultValue = Some(true),
-      doc = "When true, automatically infer the data types for partitioned columns.")
+    SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
+      .doc("When true, automatically infer the data types for partitioned columns.")
+      .booleanConf
+      .createWithDefault(true)
 
   val PARTITION_MAX_FILES =
-    intConf("spark.sql.sources.maxConcurrentWrites",
-      defaultValue = Some(1),
-      doc = "The maximum number of concurrent files to open before falling back on sorting when " +
+    SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
+      .doc("The maximum number of concurrent files to open before falling back on sorting when " +
             "writing out files using dynamic partitioning.")
-
-  val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled",
-    defaultValue = Some(true),
-    doc = "When false, we will treat bucketed table as normal table.")
-
-  val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
-    defaultValue = Some(true),
-    doc = "When true, the ordinal numbers are treated as the position in the select list. " +
-          "When false, the ordinal numbers in order/sort By clause are ignored.")
-
-  val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal",
-    defaultValue = Some(true),
-    doc = "When true, the ordinal numbers in group by clauses are treated as the position " +
+      .intConf
+      .createWithDefault(1)
+
+  val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
+    .doc("When false, we will treat bucketed table as normal table")
+    .booleanConf
+    .createWithDefault(true)
+
+  val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal")
+    .doc("When true, the ordinal numbers are treated as the position in the select list. " +
+         "When false, the ordinal numbers in order/sort By clause are ignored.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal")
+    .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
       "in the select list. When false, the ordinal numbers are ignored.")
+    .booleanConf
+    .createWithDefault(true)
 
   // The output committer class used by HadoopFsRelation. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
@@ -457,89 +346,95 @@ object SQLConf {
   //  1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
   //  2. This option can be overridden by "spark.sql.parquet.output.committer.class".
   val OUTPUT_COMMITTER_CLASS =
-    stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
+    SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
 
-  val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
-    key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
-    defaultValue = Some(32),
-    doc = "The degree of parallelism for schema merging and partition discovery of " +
-      "Parquet data sources.")
+  val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
+    SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
+      .doc("The degree of parallelism for schema merging and partition discovery of " +
+        "Parquet data sources.")
+      .intConf
+      .createWithDefault(32)
 
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query plans.
-  val DATAFRAME_EAGER_ANALYSIS = booleanConf(
-    "spark.sql.eagerAnalysis",
-    defaultValue = Some(true),
-    doc = "When true, eagerly applies query analysis on DataFrame operations.",
-    isPublic = false)
+  val DATAFRAME_EAGER_ANALYSIS = SQLConfigBuilder("spark.sql.eagerAnalysis")
+    .internal()
+    .doc("When true, eagerly applies query analysis on DataFrame operations.")
+    .booleanConf
+    .createWithDefault(true)
 
   // Whether to automatically resolve ambiguity in join conditions for self-joins.
   // See SPARK-6231.
-  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf(
-    "spark.sql.selfJoinAutoResolveAmbiguity",
-    defaultValue = Some(true),
-    isPublic = false)
+  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+    SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)
 
   // Whether to retain group by columns or not in GroupedData.agg.
-  val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf(
-    "spark.sql.retainGroupColumns",
-    defaultValue = Some(true),
-    isPublic = false)
-
-  val DATAFRAME_PIVOT_MAX_VALUES = intConf(
-    "spark.sql.pivotMaxValues",
-    defaultValue = Some(10000),
-    doc = "When doing a pivot without specifying values for the pivot column this is the maximum " +
-      "number of (distinct) values that will be collected without error."
-  )
-
-  val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
-    defaultValue = Some(true),
-    isPublic = false,
-    doc = "When true, we could use `datasource`.`path` as table in SQL query."
-  )
-
-  val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage",
-    defaultValue = Some(true),
-    doc = "When true, the whole stage (of multiple operators) will be compiled into single java" +
-      " method.",
-    isPublic = false)
-
-  val FILES_MAX_PARTITION_BYTES = longConf("spark.sql.files.maxPartitionBytes",
-    defaultValue = Some(128 * 1024 * 1024), // parquet.block.size
-    doc = "The maximum number of bytes to pack into a single partition when reading files.",
-    isPublic = true)
-
-  val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
-    defaultValue = Some(4 * 1024 * 1024),
-    doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+  val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns")
+    .internal()
+    .booleanConf
+    .createWithDefault(true)
+
+  val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues")
+    .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
+      "number of (distinct) values that will be collected without error.")
+    .intConf
+    .createWithDefault(10000)
+
+  val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles")
+    .internal()
+    .doc("When true, we could use `datasource`.`path` as table in SQL query.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage")
+    .internal()
+    .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
+      " method.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
+    .doc("The maximum number of bytes to pack into a single partition when reading files.")
+    .longConf
+    .createWithDefault(128 * 1024 * 1024) // parquet.block.size
+
+  val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes")
+    .internal()
+    .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
       " the same time. This is used when putting multiple files into a partition. It's better to" +
       " over estimated, then the partitions with small files will be faster than partitions with" +
-      " bigger files (which is scheduled first).",
-    isPublic = false)
-
-  val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
-    defaultValue = Some(true),
-    doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
-    isPublic = false)
-
-  val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = intConf(
-    "spark.sql.streaming.stateStore.minDeltasForSnapshot",
-    defaultValue = Some(10),
-    doc = "Minimum number of state store delta files that needs to be generated before they " +
-      "consolidated into snapshots.",
-    isPublic = false)
-
-  val STATE_STORE_MIN_VERSIONS_TO_RETAIN = intConf(
-    "spark.sql.streaming.stateStore.minBatchesToRetain",
-    defaultValue = Some(2),
-    doc = "Minimum number of versions of a state store's data to retain after cleaning.",
-    isPublic = false)
-
-  val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
-    defaultValue = None,
-    doc = "The default location for storing checkpoint data for continuously executing queries.",
-    isPublic = true)
+      " bigger files (which is scheduled first).")
+    .longConf
+    .createWithDefault(4 * 1024 * 1024)
+
+  val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
+    .internal()
+    .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
+    SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
+      .internal()
+      .doc("Minimum number of state store delta files that needs to be generated before they " +
+        "consolidated into snapshots.")
+      .intConf
+      .createWithDefault(10)
+
+  val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
+    SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
+      .internal()
+      .doc("Minimum number of versions of a state store's data to retain after cleaning.")
+      .intConf
+      .createWithDefault(2)
+
+  val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
+    .doc("The default location for storing checkpoint data for continuously executing queries.")
+    .stringConf
+    .createOptional
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -562,7 +457,7 @@ object SQLConf {
  *
  * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
  */
-class SQLConf extends Serializable with CatalystConf with Logging {
+private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   import SQLConf._
 
   /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -686,7 +581,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
   }
 
   /** Set the given Spark SQL configuration property. */
-  def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+  def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
     require(entry != null, "entry cannot be null")
     require(value != null, s"value cannot be null for key: ${entry.key}")
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
@@ -706,24 +601,34 @@ class SQLConf extends Serializable with CatalystConf with Logging {
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
-   * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+   * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
    * desired one.
    */
-  def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+  def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
     Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
   }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
-   * yet, return `defaultValue` in [[SQLConfEntry]].
+   * yet, return `defaultValue` in [[ConfigEntry]].
    */
-  def getConf[T](entry: SQLConfEntry[T]): T = {
+  def getConf[T](entry: ConfigEntry[T]): T = {
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
     Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
       getOrElse(throw new NoSuchElementException(entry.key))
   }
 
+  /**
+   * Return the value of an optional Spark SQL configuration property for the given key. If the key
+   * is not set yet, throw an exception.
+   */
+  def getConf[T](entry: OptionalConfigEntry[T]): T = {
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+    Option(settings.get(entry.key)).map(entry.rawValueConverter).
+      getOrElse(throw new NoSuchElementException(entry.key))
+  }
+
   /**
    * Return the `string` value of Spark SQL configuration property for the given key. If the key is
    * not set yet, return `defaultValue`.
@@ -765,7 +670,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
     settings.remove(key)
   }
 
-  def unsetConf(entry: SQLConfEntry[_]): Unit = {
+  private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = {
     settings.remove(entry.key)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 2b89fa9f23815b9bbb0a3cc978dcea3d01a41097..cc6919913948d0aca7f9e74060edb52074225e05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -26,7 +26,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("intConf") {
     val key = "spark.sql.SQLConfEntrySuite.int"
-    val confEntry = SQLConfEntry.intConf(key)
+    val confEntry = SQLConfigBuilder(key).intConf.createWithDefault(1)
     assert(conf.getConf(confEntry, 5) === 5)
 
     conf.setConf(confEntry, 10)
@@ -45,7 +45,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("longConf") {
     val key = "spark.sql.SQLConfEntrySuite.long"
-    val confEntry = SQLConfEntry.longConf(key)
+    val confEntry = SQLConfigBuilder(key).longConf.createWithDefault(1L)
     assert(conf.getConf(confEntry, 5L) === 5L)
 
     conf.setConf(confEntry, 10L)
@@ -64,7 +64,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("booleanConf") {
     val key = "spark.sql.SQLConfEntrySuite.boolean"
-    val confEntry = SQLConfEntry.booleanConf(key)
+    val confEntry = SQLConfigBuilder(key).booleanConf.createWithDefault(true)
     assert(conf.getConf(confEntry, false) === false)
 
     conf.setConf(confEntry, true)
@@ -83,7 +83,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("doubleConf") {
     val key = "spark.sql.SQLConfEntrySuite.double"
-    val confEntry = SQLConfEntry.doubleConf(key)
+    val confEntry = SQLConfigBuilder(key).doubleConf.createWithDefault(1d)
     assert(conf.getConf(confEntry, 5.0) === 5.0)
 
     conf.setConf(confEntry, 10.0)
@@ -102,7 +102,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("stringConf") {
     val key = "spark.sql.SQLConfEntrySuite.string"
-    val confEntry = SQLConfEntry.stringConf(key)
+    val confEntry = SQLConfigBuilder(key).stringConf.createWithDefault(null)
     assert(conf.getConf(confEntry, "abc") === "abc")
 
     conf.setConf(confEntry, "abcd")
@@ -116,7 +116,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("enumConf") {
     val key = "spark.sql.SQLConfEntrySuite.enum"
-    val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+    val confEntry = SQLConfigBuilder(key)
+      .stringConf
+      .checkValues(Set("a", "b", "c"))
+      .createWithDefault("a")
     assert(conf.getConf(confEntry) === "a")
 
     conf.setConf(confEntry, "b")
@@ -135,8 +138,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
 
   test("stringSeqConf") {
     val key = "spark.sql.SQLConfEntrySuite.stringSeq"
-    val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
-      defaultValue = Some(Nil))
+    val confEntry = SQLConfigBuilder(key)
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
     assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
 
     conf.setConf(confEntry, Seq("a", "b", "c", "d"))
@@ -147,4 +152,12 @@ class SQLConfEntrySuite extends SparkFunSuite {
     assert(conf.getConfString(key) === "a,b,c,d,e")
     assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
   }
+
+  test("duplicate entry") {
+    val key = "spark.sql.SQLConfEntrySuite.duplicate"
+    SQLConfigBuilder(key).stringConf.createOptional
+    intercept[IllegalArgumentException] {
+      SQLConfigBuilder(key).stringConf.createOptional
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e944d328a3ab7ff96ed27faec8a8a108bc5f132f..e687e6a5cefe9faab820512f2ab70d46a71facba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -119,15 +119,10 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
     }
 
     intercept[IllegalArgumentException] {
-      // This value less than Int.MinValue
+      // This value less than Long.MinValue
       sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
     }
 
-    // Test invalid input
-    intercept[IllegalArgumentException] {
-      // This value exceeds Long.MaxValue
-      sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
-    }
     sqlContext.conf.clear()
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 073b954a5f8ce3b87cf7dd4e63eaafa7b42c7555..505e5c0bb62f187ffd930aa3874052b23ef81e7b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionInfo
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
@@ -54,8 +55,7 @@ import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
+import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -318,7 +318,7 @@ class HiveContext private[hive](
     hiveconf.set(key, value)
   }
 
-  override private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+  override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
     setConf(entry.key, entry.stringConverter(value))
   }
 
@@ -413,19 +413,19 @@ private[hive] object HiveContext extends Logging {
   /** The version of hive used internally by Spark SQL. */
   val hiveExecutionVersion: String = "1.2.1"
 
-  val HIVE_METASTORE_VERSION = stringConf("spark.sql.hive.metastore.version",
-    defaultValue = Some(hiveExecutionVersion),
-    doc = "Version of the Hive metastore. Available options are " +
+  val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
+    .doc("Version of the Hive metastore. Available options are " +
         s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
+    .stringConf
+    .createWithDefault(hiveExecutionVersion)
 
-  val HIVE_EXECUTION_VERSION = stringConf(
-    key = "spark.sql.hive.version",
-    defaultValue = Some(hiveExecutionVersion),
-    doc = "Version of Hive used internally by Spark SQL.")
+  val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
+    .doc("Version of Hive used internally by Spark SQL.")
+    .stringConf
+    .createWithDefault(hiveExecutionVersion)
 
-  val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
-    defaultValue = Some("builtin"),
-    doc = s"""
+  val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
+    .doc(s"""
       | Location of the jars that should be used to instantiate the HiveMetastoreClient.
       | This property can be one of three options: "
       | 1. "builtin"
@@ -436,49 +436,61 @@ private[hive] object HiveContext extends Logging {
       | 2. "maven"
       |   Use Hive jars of specified version downloaded from Maven repositories.
       | 3. A classpath in the standard format for both Hive and Hadoop.
-    """.stripMargin)
-  val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
-    defaultValue = Some(true),
-    doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
-      "the built in support.")
-
-  val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
-    "spark.sql.hive.convertMetastoreParquet.mergeSchema",
-    defaultValue = Some(false),
-    doc = "When true, also tries to merge possibly different but compatible Parquet schemas in " +
-      "different Parquet data files. This configuration is only effective " +
-      "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+      """.stripMargin)
+    .stringConf
+    .createWithDefault("builtin")
 
-  val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
-    defaultValue = Some(true),
-    doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+  val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
+    .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
       "the built in support.")
-
-  val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
-    defaultValue = Some(false),
-    doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
+    .booleanConf
+    .createWithDefault(true)
+
+  val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
+    SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
+      .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
+        "different Parquet data files. This configuration is only effective " +
+        "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+    .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
       "converted to a data source table, using the data source set by spark.sql.sources.default.")
+    .booleanConf
+    .createWithDefault(false)
 
-  val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
-    defaultValue = Some(jdbcPrefixes),
-    doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
+  val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
+    .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+      "the built in support.")
+    .booleanConf
+    .createWithDefault(true)
+
+  val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
+    .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
       "that is shared between Spark SQL and a specific version of Hive. An example of classes " +
       "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
       "classes that need to be shared are those that interact with classes that are already " +
       "shared. For example, custom appenders that are used by log4j.")
+    .stringConf
+    .toSequence
+    .createWithDefault(jdbcPrefixes)
 
   private def jdbcPrefixes = Seq(
     "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
 
-  val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
-    defaultValue = Some(Seq()),
-    doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
+  val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
+    .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
       "version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
       "declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
-
-  val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
-    defaultValue = Some(true),
-    doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
+    .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+    .booleanConf
+    .createWithDefault(true)
 
   /**
    * The version of the hive client that will be used to communicate with the metastore.  Note that
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 5188a3e2297ead19d1cd91f895e9b9bb012274ed..8d576bebb062f8a11ced61839b80dc25147a92e2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -31,82 +31,82 @@ package object config {
       "in YARN Application Reports, which can be used for filtering when querying YARN.")
     .stringConf
     .toSequence
-    .optional
+    .createOptional
 
   private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
     ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
       .doc("Interval after which AM failures will be considered independent and " +
         "not accumulate towards the attempt count.")
       .timeConf(TimeUnit.MILLISECONDS)
-      .optional
+      .createOptional
 
   private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
     .doc("Maximum number of AM attempts before failing the app.")
     .intConf
-    .optional
+    .createOptional
 
   private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first")
     .doc("Whether to place user jars in front of Spark's classpath.")
     .booleanConf
-    .withDefault(false)
+    .createWithDefault(false)
 
   private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath")
     .doc("Root of configuration paths that is present on gateway nodes, and will be replaced " +
       "with the corresponding path in cluster machines.")
     .stringConf
-    .withDefault(null)
+    .createWithDefault(null)
 
   private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath")
     .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " +
       "in the YARN cluster.")
     .stringConf
-    .withDefault(null)
+    .createWithDefault(null)
 
   private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
     .stringConf
-    .withDefault("default")
+    .createWithDefault("default")
 
   private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address")
     .stringConf
-    .optional
+    .createOptional
 
   /* File distribution. */
 
   private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
     .doc("Location of archive containing jars files with Spark classes.")
     .stringConf
-    .optional
+    .createOptional
 
   private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
     .doc("Location of jars containing Spark classes.")
     .stringConf
     .toSequence
-    .optional
+    .createOptional
 
   private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 
   private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 
   private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 
   private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
     .doc("Whether to preserve temporary files created by the job in HDFS.")
     .booleanConf
-    .withDefault(false)
+    .createWithDefault(false)
 
   private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication")
     .doc("Replication factor for files uploaded by Spark to HDFS.")
     .intConf
-    .optional
+    .createOptional
 
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
@@ -119,146 +119,146 @@ package object config {
     .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
       "launcher process.")
     .booleanConf
-    .withDefault(true)
+    .createWithDefault(true)
 
   private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
     .doc("Interval between reports of the current app status in cluster mode.")
     .timeConf(TimeUnit.MILLISECONDS)
-    .withDefaultString("1s")
+    .createWithDefaultString("1s")
 
   /* Shared Client-mode AM / Driver configuration. */
 
   private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
     .timeConf(TimeUnit.MILLISECONDS)
-    .withDefaultString("100s")
+    .createWithDefaultString("100s")
 
   private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
     .doc("Node label expression for the AM.")
     .stringConf
-    .optional
+    .createOptional
 
   private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
     ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
       .intConf
-      .withDefault(25)
+      .createWithDefault(25)
 
   private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures")
     .intConf
-    .optional
+    .createOptional
 
   private[spark] val MAX_REPORTER_THREAD_FAILURES =
     ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
       .intConf
-      .withDefault(5)
+      .createWithDefault(5)
 
   private[spark] val RM_HEARTBEAT_INTERVAL =
     ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
       .timeConf(TimeUnit.MILLISECONDS)
-      .withDefaultString("3s")
+      .createWithDefaultString("3s")
 
   private[spark] val INITIAL_HEARTBEAT_INTERVAL =
     ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
       .timeConf(TimeUnit.MILLISECONDS)
-      .withDefaultString("200ms")
+      .createWithDefaultString("200ms")
 
   private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
     .doc("A comma-separated list of class names of services to add to the scheduler.")
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 
   /* Client-mode AM configuration. */
 
   private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
     .intConf
-    .withDefault(1)
+    .createWithDefault(1)
 
   private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions")
     .doc("Extra Java options for the client-mode AM.")
     .stringConf
-    .optional
+    .createOptional
 
   private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath")
     .doc("Extra native library path for the client-mode AM.")
     .stringConf
-    .optional
+    .createOptional
 
   private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead")
     .bytesConf(ByteUnit.MiB)
-    .optional
+    .createOptional
 
   private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
     .bytesConf(ByteUnit.MiB)
-    .withDefaultString("512m")
+    .createWithDefaultString("512m")
 
   /* Driver configuration. */
 
   private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
     .intConf
-    .withDefault(1)
+    .createWithDefault(1)
 
   private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
     .bytesConf(ByteUnit.MiB)
-    .optional
+    .createOptional
 
   /* Executor configuration. */
 
   private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
     .intConf
-    .withDefault(1)
+    .createWithDefault(1)
 
   private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
     .bytesConf(ByteUnit.MiB)
-    .optional
+    .createOptional
 
   private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
     ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
       .doc("Node label expression for executors.")
       .stringConf
-      .optional
+      .createOptional
 
   /* Security configuration. */
 
   private[spark] val CREDENTIAL_FILE_MAX_COUNT =
     ConfigBuilder("spark.yarn.credentials.file.retention.count")
       .intConf
-      .withDefault(5)
+      .createWithDefault(5)
 
   private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
     ConfigBuilder("spark.yarn.credentials.file.retention.days")
       .intConf
-      .withDefault(5)
+      .createWithDefault(5)
 
   private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
     .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
       "fs.defaultFS does not need to be listed here.")
     .stringConf
     .toSequence
-    .withDefault(Nil)
+    .createWithDefault(Nil)
 
   private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
-    .internal
+    .internal()
     .timeConf(TimeUnit.MILLISECONDS)
-    .optional
+    .createOptional
 
   /* Private configs. */
 
   private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
-    .internal
+    .internal()
     .stringConf
-    .withDefault(null)
+    .createWithDefault(null)
 
   // Internal config to propagate the location of the user's jar to the driver/executors
   private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
-    .internal
+    .internal()
     .stringConf
-    .optional
+    .createOptional
 
   // Internal config to propagate the locations of any extra jars to add to the classpath
   // of the executors
   private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars")
-    .internal
+    .internal()
     .stringConf
     .toSequence
-    .optional
+    .createOptional
 }