diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 956724b14bba3520e61dfe652e693920d837fa4d..ba7a65f79c41406cc8aeaf8132c2d3a32d39e6ba 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -592,6 +592,8 @@ private[spark] object SparkConf extends Logging {
    *
    * The alternates are used in the order defined in this map. If deprecated configs are
    * present in the user's configuration, a warning is logged.
+   *
+   * TODO: consolidate it with `ConfigBuilder.withAlternative`.
    */
   private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
     "spark.executor.userClassPathFirst" -> Seq(
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index e5d60a7ef0984e7dba4e11020dbeb9cc87fe5e09..515c9c24a9e2f3fca98f255752d88d436fa50b83 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -126,8 +126,8 @@ private[spark] class TypedConfigBuilder[T](
 
   /** Creates a [[ConfigEntry]] that does not have a default value. */
   def createOptional: OptionalConfigEntry[T] = {
-    val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
-      parent._public)
+    val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
+      stringConverter, parent._doc, parent._public)
     parent._onCreate.foreach(_(entry))
     entry
   }
@@ -140,8 +140,8 @@ private[spark] class TypedConfigBuilder[T](
       createWithDefaultString(default.asInstanceOf[String])
     } else {
       val transformedDefault = converter(stringConverter(default))
-      val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
-        stringConverter, parent._doc, parent._public)
+      val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
+        transformedDefault, converter, stringConverter, parent._doc, parent._public)
       parent._onCreate.foreach(_(entry))
       entry
     }
@@ -149,8 +149,8 @@ private[spark] class TypedConfigBuilder[T](
 
   /** Creates a [[ConfigEntry]] with a function to determine the default value */
   def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
-    val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter,
-      stringConverter, parent._doc, parent._public)
+    val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
+      converter, stringConverter, parent._doc, parent._public)
     parent._onCreate.foreach(_ (entry))
     entry
   }
@@ -160,8 +160,8 @@ private[spark] class TypedConfigBuilder[T](
    * [[String]] and must be a valid value for the entry.
    */
   def createWithDefaultString(default: String): ConfigEntry[T] = {
-    val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
-      parent._doc, parent._public)
+    val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
+      converter, stringConverter, parent._doc, parent._public)
     parent._onCreate.foreach(_(entry))
     entry
   }
@@ -180,6 +180,7 @@ private[spark] case class ConfigBuilder(key: String) {
   private[config] var _public = true
   private[config] var _doc = ""
   private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
+  private[config] var _alternatives = List.empty[String]
 
   def internal(): ConfigBuilder = {
     _public = false
@@ -200,6 +201,11 @@ private[spark] case class ConfigBuilder(key: String) {
     this
   }
 
+  def withAlternative(key: String): ConfigBuilder = {
+    _alternatives = _alternatives :+ key
+    this
+  }
+
   def intConf: TypedConfigBuilder[Int] = {
     new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
   }
@@ -229,7 +235,7 @@ private[spark] case class ConfigBuilder(key: String) {
   }
 
   def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
-    new FallbackConfigEntry(key, _doc, _public, fallback)
+    new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
   }
 
   def regexConf: TypedConfigBuilder[Regex] = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index e86712e84d6ac040799ba4081f720688008cfb16..f1190289244e977099c6fe95dd54b266f0ad93df 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -41,6 +41,7 @@ package org.apache.spark.internal.config
  */
 private[spark] abstract class ConfigEntry[T] (
     val key: String,
+    val alternatives: List[String],
     val valueConverter: String => T,
     val stringConverter: T => String,
     val doc: String,
@@ -52,6 +53,10 @@ private[spark] abstract class ConfigEntry[T] (
 
   def defaultValueString: String
 
+  protected def readString(reader: ConfigReader): Option[String] = {
+    alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
+  }
+
   def readFrom(reader: ConfigReader): T
 
   def defaultValue: Option[T] = None
@@ -59,63 +64,64 @@ private[spark] abstract class ConfigEntry[T] (
   override def toString: String = {
     s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
   }
-
 }
 
 private class ConfigEntryWithDefault[T] (
     key: String,
+    alternatives: List[String],
     _defaultValue: T,
     valueConverter: String => T,
     stringConverter: T => String,
     doc: String,
     isPublic: Boolean)
-  extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+  extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
 
   override def defaultValue: Option[T] = Some(_defaultValue)
 
   override def defaultValueString: String = stringConverter(_defaultValue)
 
   def readFrom(reader: ConfigReader): T = {
-    reader.get(key).map(valueConverter).getOrElse(_defaultValue)
+    readString(reader).map(valueConverter).getOrElse(_defaultValue)
   }
 }
 
 private class ConfigEntryWithDefaultFunction[T] (
      key: String,
+     alternatives: List[String],
      _defaultFunction: () => T,
      valueConverter: String => T,
      stringConverter: T => String,
      doc: String,
      isPublic: Boolean)
-  extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+  extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
 
   override def defaultValue: Option[T] = Some(_defaultFunction())
 
   override def defaultValueString: String = stringConverter(_defaultFunction())
 
   def readFrom(reader: ConfigReader): T = {
-    reader.get(key).map(valueConverter).getOrElse(_defaultFunction())
+    readString(reader).map(valueConverter).getOrElse(_defaultFunction())
   }
 }
 
 private class ConfigEntryWithDefaultString[T] (
     key: String,
+    alternatives: List[String],
     _defaultValue: String,
     valueConverter: String => T,
     stringConverter: T => String,
     doc: String,
     isPublic: Boolean)
-  extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+  extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
 
   override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
 
   override def defaultValueString: String = _defaultValue
 
   def readFrom(reader: ConfigReader): T = {
-    val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
+    val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
     valueConverter(value)
   }
-
 }
 
 
@@ -124,19 +130,20 @@ private class ConfigEntryWithDefaultString[T] (
  */
 private[spark] class OptionalConfigEntry[T](
     key: String,
+    alternatives: List[String],
     val rawValueConverter: String => T,
     val rawStringConverter: T => String,
     doc: String,
     isPublic: Boolean)
-  extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
+  extends ConfigEntry[Option[T]](key, alternatives,
+    s => Some(rawValueConverter(s)),
     v => v.map(rawStringConverter).orNull, doc, isPublic) {
 
   override def defaultValueString: String = "<undefined>"
 
   override def readFrom(reader: ConfigReader): Option[T] = {
-    reader.get(key).map(rawValueConverter)
+    readString(reader).map(rawValueConverter)
   }
-
 }
 
 /**
@@ -144,17 +151,18 @@ private[spark] class OptionalConfigEntry[T](
  */
 private class FallbackConfigEntry[T] (
     key: String,
+    alternatives: List[String],
     doc: String,
     isPublic: Boolean,
     private[config] val fallback: ConfigEntry[T])
-  extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
+  extends ConfigEntry[T](key, alternatives,
+    fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
 
   override def defaultValueString: String = s"<value of ${fallback.key}>"
 
   override def readFrom(reader: ConfigReader): T = {
-    reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
+    readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
   }
-
 }
 
 private[spark] object ConfigEntry {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
index 97f56a64d600f28181c4792993c5c94abd7f4d3b..5d98a1185f053de79043815fd2956bf7e6397ac3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -47,28 +47,16 @@ private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvi
 }
 
 /**
- * A config provider that only reads Spark config keys, and considers default values for known
- * configs when fetching configuration values.
+ * A config provider that only reads Spark config keys.
  */
 private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
 
-  import ConfigEntry._
-
   override def get(key: String): Option[String] = {
     if (key.startsWith("spark.")) {
-      Option(conf.get(key)).orElse(defaultValueString(key))
+      Option(conf.get(key))
     } else {
       None
     }
   }
 
-  private def defaultValueString(key: String): Option[String] = {
-    findEntry(key) match {
-      case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
-      case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
-      case e: FallbackConfigEntry[_] => get(e.fallback.key)
-      case _ => None
-    }
-  }
-
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
index c62de9bfd8fc3dcbb82b0d35e5f48fefd53deaf0..c1ab22150d024e337a7fae2b51b50bc19e255597 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
@@ -92,7 +92,7 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
         require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")
 
         val replacement = bindings.get(prefix)
-          .flatMap(_.get(name))
+          .flatMap(getOrDefault(_, name))
           .map { v => substitute(v, usedRefs + ref) }
           .getOrElse(m.matched)
         Regex.quoteReplacement(replacement)
@@ -102,4 +102,20 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
     }
   }
 
+  /**
+   * Gets the value of a config from the given `ConfigProvider`. If no value is found for this
+   * config, and the `ConfigEntry` defines this config has default value, return the default value.
+   */
+  private def getOrDefault(conf: ConfigProvider, key: String): Option[String] = {
+    conf.get(key).orElse {
+      ConfigEntry.findEntry(key) match {
+        case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
+        case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
+        case e: ConfigEntryWithDefaultFunction[_] => Option(e.defaultValueString)
+        case e: FallbackConfigEntry[_] => getOrDefault(conf, e.fallback.key)
+        case _ => None
+      }
+    }
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f8139b706a7cc17e2abddf62e3270e4bdb8df539..4ad04b04c312d36e9f2e30ff09bb694e4b5df57e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -151,9 +151,11 @@ package object config {
       .createOptional
   // End blacklist confs
 
-  private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
-    ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+  private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
+    ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
+      .withAlternative("spark.scheduler.listenerbus.eventqueue.size")
       .intConf
+      .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
       .createWithDefault(10000)
 
   // This property sets the root namespace for metrics reporting
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 5533f7b1f23634ab18d140579df469986f6d3a79..801dfaa62306ac695f1c0cc10dede690d330ff23 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.util.DynamicVariable
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
@@ -34,23 +34,14 @@ import org.apache.spark.util.Utils
  * is stopped when `stop()` is called, and it will drop further events after stopping.
  */
 private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
-
   self =>
 
   import LiveListenerBus._
 
   // Cap the capacity of the event queue so we get an explicit error (rather than
   // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
-  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
-  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
-
-  private def validateAndGetQueueSize(): Int = {
-    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
-    if (queueSize <= 0) {
-      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
-    }
-    queueSize
-  }
+  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
+    sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
 
   // Indicate if `start()` is called
   private val started = new AtomicBoolean(false)
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index b72cd8be242068d9e07cafcebc9c46df563edaaa..bf08276dbf971721e1d412352a788ab4f439d714 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -261,4 +261,31 @@ class ConfigEntrySuite extends SparkFunSuite {
     data = 2
     assert(conf.get(iConf) === 2)
   }
+
+  test("conf entry: alternative keys") {
+    val conf = new SparkConf()
+    val iConf = ConfigBuilder(testKey("a"))
+      .withAlternative(testKey("b"))
+      .withAlternative(testKey("c"))
+      .intConf.createWithDefault(0)
+
+    // no key is set, return default value.
+    assert(conf.get(iConf) === 0)
+
+    // the primary key is set, the alternative keys are not set, return the value of primary key.
+    conf.set(testKey("a"), "1")
+    assert(conf.get(iConf) === 1)
+
+    // the primary key and alternative keys are all set, return the value of primary key.
+    conf.set(testKey("b"), "2")
+    conf.set(testKey("c"), "3")
+    assert(conf.get(iConf) === 1)
+
+    // the primary key is not set, (some of) the alternative keys are set, return the value of the
+    // first alternative key that is set.
+    conf.remove(testKey("a"))
+    assert(conf.get(iConf) === 2)
+    conf.remove(testKey("b"))
+    assert(conf.get(iConf) === 3)
+  }
 }